Unusual bug,please help me,i can do nothing!!!
Hello, I am a spark user. I use the "spark-shell.cmd" startup command in windows cmd, the first startup is normal, when I use the "ctrl+c" command to force the end of the spark window, it can't start normally again. .The error message is as follows "Failed to initialize Spark session.org.apache.spark.SparkException: Invalid Spark URL: spark://HeartbeatReceiver@x.168.137.41:49963". When I try to add "x.168.137.41" in 'etc/hosts' it works fine, then use "ctrl+c" again. The result is that it cannot start normally. Please help me
error bug,please help me!!!
Hello, I am a spark user. I use the "spark-shell.cmd" startup command in windows cmd, the first startup is normal, when I use the "ctrl+c" command to force the end of the spark window, it can't start normally again. .The error message is as follows "Failed to initialize Spark session.org.apache.spark.SparkException: Invalid Spark URL: spark://HeartbeatReceiver@x.168.137.41:49963". When I try to add "x.168.137.41" in 'etc/hosts' it works fine, then use "ctrl+c" again. The result is that it cannot start normally. Please help me
Connection Reset by Peer : failed to remove cached rdd
Hi Team , We are facing issue in production where we are getting frequent Still have 1 request outstanding when connection with the hostname was closed connection reset by peer : errors as well as warnings : failed to remove cache rdd or failed to remove broadcast variable. Please help us how to mitigate this : Executor memory : 12g Network timeout : 60Heartbeat interval : 25 Virus-free. www.avast.com - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Connection Reset by Peer : failed to remove cached rdd
Hi Team , We are facing issue in production where we are getting frequent Still have 1 request outstanding when connection with the hostname was closed connection reset by peer : errors as well as warnings : failed to remove cache rdd or failed to remove broadcast variable. Please help us how to mitigate this : Executor memory : 12g Network timeout : 60Heartbeat interval : 25 Virus-free. www.avast.com - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Application Timeout
Jacek, Turns out that this was the RPC connection to the master (7077) from the driver closing. We had Istio closing this out as there was a silly idle timeout setting they had after one hour. I was able to re-create this by running lsof on the driver for port 7077 and then killing that process. After this, I would see the application mark as "finished" The fix was to exclude port 7077 on the istio sidecar... it only took me over 6 months to figure this out, so I wanted to share. :) On Thu, Jan 21, 2021 at 5:39 AM Jacek Laskowski wrote: > Hi Brett, > > No idea why it happens, but got curious about this "Cores" column being 0. > Is this always the case? > > Pozdrawiam, > Jacek Laskowski > > https://about.me/JacekLaskowski > "The Internals Of" Online Books <https://books.japila.pl/> > Follow me on https://twitter.com/jaceklaskowski > > <https://twitter.com/jaceklaskowski> > > > On Tue, Jan 19, 2021 at 11:27 PM Brett Spark > wrote: > >> Hello! >> When using Spark Standalone & Spark 2.4.4 / 3.0.0 - we are seeing our >> standalone Spark "applications" timeout and show as "Finished" after around >> an hour of time. >> >> Here is a screenshot from the Spark master before it's marked as finished. >> [image: image.png] >> Here is a screenshot from the Spark master after it's marked as finished. >> (After over an hour of idle time). >> [image: image.png] >> Here are the logs from the Spark Master / Worker: >> >> spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master >> 2021-01-19 21:55:47,282 INFO master.Master: 172.32.3.66:34570 got >> disassociated, removing it. >> spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master >> 2021-01-19 21:55:52,095 INFO master.Master: 172.32.115.115:36556 got >> disassociated, removing it. >> spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master >> 2021-01-19 21:55:52,095 INFO master.Master: 172.32.115.115:37305 got >> disassociated, removing it. >> spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master >> 2021-01-19 21:55:52,096 INFO master.Master: Removing app >> app-20210119204911- >> spark-worker-2d733568b2a7e82de7b2b09b6daa17e9-7bbb75f9b6-8mv2b worker >> 2021-01-19 21:55:52,112 INFO shuffle.ExternalShuffleBlockResolver: >> Application app-20210119204911- removed, cleanupLocalDirs = true >> >> Is there a setting that causes an application to timeout after an hour of >> a Spark application or Spark worker being idle? >> >> I would like to keep our Spark applications alive as long as possible. >> >> I haven't been able to find a setting in the Spark confs documentation >> that corresponds to this so i'm wondering if this is something that's hard >> coded. >> >> Please let me know, >> Thank you! >> >
Application Timeout
Hello! When using Spark Standalone & Spark 2.4.4 / 3.0.0 - we are seeing our standalone Spark "applications" timeout and show as "Finished" after around an hour of time. Here is a screenshot from the Spark master before it's marked as finished. [image: image.png] Here is a screenshot from the Spark master after it's marked as finished. (After over an hour of idle time). [image: image.png] Here are the logs from the Spark Master / Worker: spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master 2021-01-19 21:55:47,282 INFO master.Master: 172.32.3.66:34570 got disassociated, removing it. spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master 2021-01-19 21:55:52,095 INFO master.Master: 172.32.115.115:36556 got disassociated, removing it. spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master 2021-01-19 21:55:52,095 INFO master.Master: 172.32.115.115:37305 got disassociated, removing it. spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master 2021-01-19 21:55:52,096 INFO master.Master: Removing app app-20210119204911- spark-worker-2d733568b2a7e82de7b2b09b6daa17e9-7bbb75f9b6-8mv2b worker 2021-01-19 21:55:52,112 INFO shuffle.ExternalShuffleBlockResolver: Application app-20210119204911- removed, cleanupLocalDirs = true Is there a setting that causes an application to timeout after an hour of a Spark application or Spark worker being idle? I would like to keep our Spark applications alive as long as possible. I haven't been able to find a setting in the Spark confs documentation that corresponds to this so i'm wondering if this is something that's hard coded. Please let me know, Thank you!
Spark stable release for Hadoop 3
Hello, We are considering whether to use Hadoop or Kubernetes as the cluster manager for Spark. We would prefer to have Hadoop 3 because of its native support for scheduling GPUs. Although there is a Spark 3.0.0 pre-view2 version available that is pre-built for Hadoop 3, I would like to know when is the estimated date for a Spark stable release for Hadoop 3. Thank you, Piper
Re: writing into oracle database is very slow
hi Jiang, i was facing the very same issue ,the solution is write to file and using oracle external table to do the insert. hope this could help. Dalin On Thu, Apr 18, 2019 at 11:43 AM Jörn Franke wrote: > What is the size of the data? How much time does it need on HDFS and how > much on Oracle? How many partitions do you have on Oracle side? > > Am 06.04.2019 um 16:59 schrieb Lian Jiang : > > Hi, > > My spark job writes into oracle db using: > > df.coalesce(10).write.format("jdbc").option("url", url) > .option("driver", driver).option("user", user) > .option("batchsize", 2000) > .option("password", password).option("dbtable", > tableName).mode("append").save() > > It is much slow than writting into HDFS. The data to write is small. > > Is this expected? Thanks for any clue. > > >
Re: Hive to Oracle using Spark - Type(Date) conversion issue
Use unix time and write the unix time to oracle as number column type ,create virtual column in oracle database for the unix time like “oracle_time generated always as (to_date('1970010108','MMDDHH24')+(1/24/60/60)*unixtime ) > On Mar 20, 2018, at 11:08 PM, Gurusamy Thirupathy wrote: > > HI Jorn, > > Thanks for your sharing different options, yes we are trying to build a > generic tool for Hive to Spark export. > FYI, currently we are using sqoop, we are trying to migrate from sqoop to > spark. > > Thanks > -G > > On Tue, Mar 20, 2018 at 2:17 AM, Jörn Franke <mailto:jornfra...@gmail.com>> wrote: > Write your own Spark UDF. Apply it to all varchar columns. > > Within this udf you can use the SimpleDateFormat parse method. If this method > returns null you return the content as varchar if not you return a date. If > the content is null you return null. > > Alternatively you can define an insert function as pl/sql on Oracle side. > > Another alternative is to read the Oracle metadata for the table at runtime > and then adapt your conversion based on this. > > However, this may not be perfect depending on your use case. Can you please > provide more details/examples? Do you aim at a generic hive to Oracle import > tool using Spark? Sqoop would not be an alternative? > > On 20. Mar 2018, at 03:45, Gurusamy Thirupathy <mailto:thirug...@gmail.com>> wrote: > >> Hi guha, >> >> Thanks for your quick response, option a and b are in our table already. For >> option b, again the same problem, we don't know which column is date. >> >> >> Thanks, >> -G >> >> On Sun, Mar 18, 2018 at 9:36 PM, Deepak Sharma > <mailto:deepakmc...@gmail.com>> wrote: >> The other approach would to write to temp table and then merge the data. >> But this may be expensive solution. >> >> Thanks >> Deepak >> >> On Mon, Mar 19, 2018, 08:04 Gurusamy Thirupathy > <mailto:thirug...@gmail.com>> wrote: >> Hi, >> >> I am trying to read data from Hive as DataFrame, then trying to write the DF >> into the Oracle data base. In this case, the date field/column in hive is >> with Type Varchar(20) >> but the corresponding column type in Oracle is Date. While reading from hive >> , the hive table names are dynamically decided(read from another table) >> based on some job condition(ex. Job1). There are multiple tables like this, >> so column and the table names are decided only run time. So I can't do type >> conversion explicitly when read from Hive. >> >> So is there any utility/api available in Spark to achieve this conversion >> issue? >> >> >> Thanks, >> Guru >> >> >> >> -- >> Thanks, >> Guru > > > > -- > Thanks, > Guru
Re: [Structured Streaming] More than 1 streaming in a code
Hi Panagiotis , Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate so much if you could paste the code snippet if it’s working . Thanks. > 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道: > > Hi Panagiotis, > > I did that, but it still prints the result of the first query and awaits for > new data, doesn't even goes to the next one. > > Data - > > $ nc -lk 9998 > > 1,2 > 3,4 > 5,6 > 7,8 > > Result - > > --- > Batch: 0 > --- > ++ > |aver| > ++ > | 3.0| > ++ > > --- > Batch: 1 > --- > ++ > |aver| > ++ > | 4.0| > ++ > > > Updated Code - > from pyspark.sql import SparkSession > from pyspark.sql.functions import split > > spark = SparkSession \ > .builder \ > .appName("StructuredNetworkWordCount") \ > .getOrCreate() > > data = spark \ > .readStream \ > .format("socket") \ > .option("header","true") \ > .option("host", "localhost") \ > .option("port", 9998) \ > .load("csv") > > > id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), > split(data.value, ",").getItem(1).alias("col2")) > > id_DF.createOrReplaceTempView("ds") > > df = spark.sql("select avg(col1) as aver from ds") > > df.createOrReplaceTempView("abcd") > > wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 > from ds") # (select aver from abcd) > > query2 = df \ > .writeStream \ > .format("console") \ > .outputMode("complete") \ > .trigger(processingTime='5 seconds') \ > .start() > > query = wordCounts \ > .writeStream \ > .format("console") \ > .trigger(processingTime='5 seconds') \ > .start() > > spark.streams.awaitAnyTermination() > > > Thanks, > Aakash. > > On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com > <mailto:panga...@gmail.com>> wrote: > Hello Aakash, > > When you use query.awaitTermination you are pretty much blocking there > waiting for the current query to stop or throw an exception. In your case the > second query will not even start. > What you could do instead is remove all the blocking calls and use > spark.streams.awaitAnyTermination instead (waiting for either query1 or > query2 to terminate). Make sure you do that after the query2.start call. > > I hope this helps. > > Cheers, > Panagiotis > > On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com > <mailto:aakash.spark@gmail.com>> wrote: > Any help? > > Need urgent help. Someone please clarify the doubt? > > -- Forwarded message -- > From: Aakash Basu <aakash.spark@gmail.com > <mailto:aakash.spark@gmail.com>> > Date: Thu, Apr 5, 2018 at 3:18 PM > Subject: [Structured Streaming] More than 1 streaming in a code > To: user <user@spark.apache.org <mailto:user@spark.apache.org>> > > > Hi, > > If I have more than one writeStream in a code, which operates on the same > readStream data, why does it produce only the first writeStream? I want the > second one to be also printed on the console. > > How to do that? > > from pyspark.sql import SparkSession > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("Stream_Col_Oper_Spark") \ > .getOrCreate() > > data = spark.readStream.format("kafka") \ > .option("startingOffsets", "latest") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "test1") \ > .load() > > ID = data.select('value') \ > .withColumn('value', data.value.cast("string")) \ > .withColumn("Col1", split(col("value"), ",").getItem(0)) \ > .withColumn("Col2", split(col("value"), ",").getItem(1)) \ > .drop('value') > > ID.createOrReplaceTempView("transformed_Stream_DF") > > df = spark.sql("select avg(col1) as aver from transformed_Stream_DF") > > df.
spark streaming kafka not displaying data in local eclipse
Hi, I have a simple Java program to read data from kafka using spark streaming. When i run it from eclipse on my mac, it is connecting to the zookeeper, bootstrap nodes, But its not displaying any data. it does not give any error. it just shows 18/01/16 20:49:15 INFO Executor: Finished task 96.0 in stage 0.0 (TID 0). 1412 bytes result sent to driver 18/01/16 20:49:15 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 2, localhost, partition 1, ANY, 5832 bytes) 18/01/16 20:49:15 INFO Executor: Running task 1.0 in stage 0.0 (TID 2) 18/01/16 20:49:15 INFO TaskSetManager: Finished task 96.0 in stage 0.0 (TID 0) in 111 ms on localhost (1/97) 18/01/16 20:49:15 INFO KafkaRDD: Computing topic data_stream, partition 16 offsets 25624028 -> 25624097 18/01/16 20:49:15 INFO VerifiableProperties: Verifying properties 18/01/16 20:49:15 INFO VerifiableProperties: Property auto.offset.reset is overridden to largest 18/01/16 20:49:15 INFO VerifiableProperties: Property fetch.message.max.bytes is overridden to 20971520 18/01/16 20:49:15 INFO VerifiableProperties: Property group.id is overridden to VR-Test-Group 18/01/16 20:49:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to zk.kafka-cluster...:8091 18/01/16 20:49:25 INFO JobScheduler: Added jobs for time 151616456 ms 18/01/16 20:49:36 INFO JobScheduler: Added jobs for time 151616457 ms 18/01/16 20:49:45 INFO JobScheduler: Added jobs for time 151616458 ms 18/01/16 20:49:55 INFO JobScheduler: Added jobs for time 151616459 ms 18/01/16 20:50:07 INFO JobScheduler: Added jobs for time 151616460 ms 18/01/16 20:50:15 INFO JobScheduler: Added jobs for time 151616461 ms But when i export it as jar and run it in a remote spark cluster , it does display the actual data. Please suggest what could be wrong. thanks VR
DataFrame joins with Spark-Java
Dear Friends, I am new to spark DataFrame. My requirement is i have a dataframe1 contains the today's records and dataframe2 contains yesterday's records. I need to compare the today's records with yesterday's records and find out new records which are not exists in the yesterday's records based on the primary key of the column. Here, the problem is sometimes there are multiple columns having primary keys. I am receiving primary key columns in a List. example: List primaryKeyList = listOfPrimarykeys; // single or multiple primary key columns DataFrame currentDataRecords = queryexecutor.getCurrentRecords(); // this contains today's records DataFrame yesterdayRecords = queryexecutor.getYesterdayRecords();// this contains yesterday's records Can you anyone help me how to join these two dataframes and apply WHERE conditions on columns dynamically with SPARK-JAVA code. Thanks Sushma
Re: Reload some static data during struct streaming
I need it cached to improve throughput ,only hope it can be refreshed once a day not every batch. > On Nov 13, 2017, at 4:49 PM, Burak Yavuz <brk...@gmail.com> wrote: > > I think if you don't cache the jdbc table, then it should auto-refresh. > > On Mon, Nov 13, 2017 at 1:21 PM, spark receiver <spark.recei...@gmail.com > <mailto:spark.recei...@gmail.com>> wrote: > Hi > > I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works great. > The thing is I need to join the Kafka message with a relative static table > stored in mysql database (let’s call it metadata here). > > So is it possible to reload the metadata table after some time interval(like > daily ) without restart running struct streaming? > > Snippet code as following : > // df_meta contains important information to join with the dataframe read > from kafka > val df_meta = spark.read.format("jdbc").option("url", > mysql_url).option("dbtable", "v_entity_ap_rel").load() > df_meta.cache() > val df = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", > “x.x.x.x:9092").option("fetch.message.max.bytes", > "5000").option("kafka.max.partition.fetch.bytes", "5000") > .option("subscribe", "rawdb.raw_data") > .option("failOnDataLoss", true) > .option("startingOffsets", "latest") > .load() > .select($"value".as[Array[Byte]]) > .map(avroDeserialize(_)) > .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime") > .join(df_meta.as <http://df_meta.as/>("b"), $"a.apmac" === $"b.apmac”) > > df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi > then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", > "substring(stime,1,13) STIME_HOUR") > .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR") > .option("checkpointLocation", > "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes")) > .start("T_CF_TABLE") > .awaitTermination() > > Mason >
Reload some static data during struct streaming
Hi I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works great. The thing is I need to join the Kafka message with a relative static table stored in mysql database (let’s call it metadata here). So is it possible to reload the metadata table after some time interval(like daily ) without restart running struct streaming? Snippet code as following : // df_meta contains important information to join with the dataframe read from kafka val df_meta = spark.read.format("jdbc").option("url", mysql_url).option("dbtable", "v_entity_ap_rel").load() df_meta.cache() val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", “x.x.x.x:9092").option("fetch.message.max.bytes", "5000").option("kafka.max.partition.fetch.bytes", "5000") .option("subscribe", "rawdb.raw_data") .option("failOnDataLoss", true) .option("startingOffsets", "latest") .load() .select($"value".as[Array[Byte]]) .map(avroDeserialize(_)) .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime") .join(df_meta.as("b"), $"a.apmac" === $"b.apmac”) df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR") .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR") .option("checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes")) .start("T_CF_TABLE") .awaitTermination() Mason
Re: Driver hung and happend out of memory while writing to console progress bar
How much memory have you allocated to the driver? Driver stores some state for tracking the task, stage and job history that you can see in the spark console, it does take up a significant portion of the heap, anywhere from 200MB - 1G, depending no your map reduce steps. Either way that is a good place to start by checking how much memory you have allocated to the driver. If it is sufficient , like in the order of 2- 3G + at least, then you will have to take heap dumps of the driver process periodically and find out what objects grow over time. On Fri, Feb 10, 2017 at 9:34 AM, Ryan Blue <rb...@netflix.com.invalid> wrote: > This isn't related to the progress bar, it just happened while in that > section of code. Something else is taking memory in the driver, usually a > broadcast table or something else that requires a lot of memory and happens > on the driver. > > You should check your driver memory settings and the query plan (if this > was SparkSQL) for this stage to investigate further. > > rb > > On Thu, Feb 9, 2017 at 8:41 PM, John Fang <xiaojian....@alibaba-inc.com> > wrote: > >> the spark version is 2.1.0 >> >> -- >> 发件人:方孝健(玄弟) <xiaojian@alibaba-inc.com> >> 发送时间:2017年2月10日(星期五) 12:35 >> 收件人:spark-dev <d...@spark.apache.org>; spark-user <user@spark.apache.org> >> 主 题:Driver hung and happend out of memory while writing to console >> progress bar >> >> [Stage 172:==> (10328 + 93) / >> 16144][Stage 172:==> (10329 + >> 93) / 16144][Stage 172:==> >> (10330 + 93) / 16144][Stage 172:==> >>(10331 + 93) / 16144][Stage 172:==> >> (10333 + 92) / 16144][Stage 172:==> >> (10333 + 93) / 16144][Stage 172:==> >> (10333 + 94) / 16144][Stage 172:==> >>(10334 + 94) / 16144][Stage >> 172:==> (10338 + 93) / >> 16144][Stage 172:==> (10339 + >> 92) / 16144][Stage 172:==> >> (10340 + 93) / 16144][Stage 172:==> >>(10341 + 92) / 16144][Stage 172:==> >> (10341 + 93) / 16144][Stage 172:==> >> (10342 + 93) / 16144][Stage 172:==> >> (10343 + 93) / 16144][Stage 172:==> >>(10344 + 92) / 16144][Stage >> 172:==> (10345 + 92) / >> 16144][Stage 172:==> (10345 + >> 93) / 16144][Stage 172:==> >> (10346 + 93) / 16144][Stage 172:==> >>(10348 + 92) / 16144][Stage 172:==> >> (10348 + 93) / 16144][Stage 172:==> >> (10349 + 92) / 16144][Stage 172:==> >> (10349 + 93) / 16144][Stage 172:==> >>(10350 + 92) / 16144][Stage >> 172:==> (10352 + 92) / >> 16144][Stage 172:==> (10353 + >> 92) / 16144][Stage 172:==> >> (10354 + 92) / 16144][Stage 172:==> >>(10355 + 92) / 16144][Stage 172:==> >> (10356 + 92) / 16144][Stage 172:==> >> (10356 + 93) / 16144][Stage 172:==> >> (10357 + 92) / 16144][Stage 172:==> >>(10357 + 93) / 16144][Stage >> 172:==> (10358 + 92) / >> 16144][Stage 172:==> (10358 + >> 93) / 16144][Stage 172:==> >> (10359 + 92) / 16144][Stage 172:==> >>(10359 + 93) / 16144][Stage 172:==
Re: Question about best Spark tuning
My take on the 2-3 tasks per CPU core is that you want to ensure you are utilizing the cores to the max, which means it will help you with scaling and performance. The question would be why not 1 task per core? The reason is that you can probably get a good handle on the average execution time per task but the execution time p90 + can be spiky. In which case you don't want the long poll task (s) to slow down your entire batch (which is in general what you would tune your application for). So by having 2-3 tasks per CPU core, you can further break down the work to smaller chunks hence completing tasks quicker and let the spark scheduler (which is low cost and efficient based on my observation, it is never the bottleneck) do the work of distributing the work among the tasks. I have experimented with 1 task per core, 2-3 tasks per core and all the way up to 20+ tasks per core. The performance difference was similar between 3 tasks per core and 20+ tasks per core. But it does make a difference in performance when you compare 1 task per core v/s 2-3 tasks per core. Hope this explanation makes sense. Best, Bharath On Thu, Feb 9, 2017 at 2:11 PM, Ji Yan <ji...@drive.ai> wrote: > Dear spark users, > > From this site https://spark.apache.org/docs/latest/tuning.html where it > offers recommendation on setting the level of parallelism > > Clusters will not be fully utilized unless you set the level of >> parallelism for each operation high enough. Spark automatically sets the >> number of “map” tasks to run on each file according to its size (though you >> can control it through optional parameters to SparkContext.textFile, >> etc), and for distributed “reduce” operations, such as groupByKey and >> reduceByKey, it uses the largest parent RDD’s number of partitions. You >> can pass the level of parallelism as a second argument (see the >> spark.PairRDDFunctions >> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions> >> documentation), or set the config property spark.default.parallelism to >> change the default. *In general, we recommend 2-3 tasks per CPU core in >> your cluster*. > > > Do people have a general theory/intuition about why it is a good idea to > have 2-3 tasks running per CPU core? > > Thanks > Ji > > The information in this email is confidential and may be legally > privileged. It is intended solely for the addressee. Access to this email > by anyone else is unauthorized. If you are not the intended recipient, any > disclosure, copying, distribution or any action taken or omitted to be > taken in reliance on it, is prohibited and may be unlawful. >
Re: Is it better to Use Java or Python on Scala for Spark for using big data sets
Spark has more support for scala, by that I mean more APIs are available for scala compared to python or Java. Also scala code will be more concise and easy to read. Java is very verbose. On Thu, Feb 9, 2017 at 10:21 PM, Irving Duran <irving.du...@gmail.com> wrote: > I would say Java, since it will be somewhat similar to Scala. Now, this > assumes that you have some app already written in Scala. If you don't, then > pick the language that you feel most comfortable with. > > Thank you, > > Irving Duran > > On Feb 9, 2017, at 11:59 PM, nancy henry <nancyhenry6...@gmail.com> wrote: > > Hi All, > > Is it better to Use Java or Python on Scala for Spark coding.. > > Mainly My work is with getting file data which is in csv format and I > have to do some rule checking and rule aggrgeation > > and put the final filtered data back to oracle so that real time apps can > use it.. >
Re: Performance bug in UDAF?
Pinging again on this topic. Is there an easy way to select TopN in a RelationalGroupedDataset? Basically in the below example dataSet.groupBy("Column1").agg(udaf("Column2", "Column3") returns a RelationalGroupedDataset. One way to address the data skew would be to reduce the data per key (Column1 being the key here). And if we are interested in TopN values per column (like Column2, Column3) how can we get TopN from RelationalGroupedDataset? Is the only way to get TopN is by implementing it in the udaf? Would appreciate any pointers or examples if someone has solved similar problem. Thanks, Bharath On Mon, Oct 31, 2016 at 11:40 AM, Spark User <sparkuser2...@gmail.com> wrote: > Trying again. Hoping to find some help in figuring out the performance > bottleneck we are observing. > > Thanks, > Bharath > > On Sun, Oct 30, 2016 at 11:58 AM, Spark User <sparkuser2...@gmail.com> > wrote: > >> Hi All, >> >> I have a UDAF that seems to perform poorly when its input is skewed. I >> have been debugging the UDAF implementation but I don't see any code that >> is causing the performance to degrade. More details on the data and the >> experiments I have run. >> >> DataSet: Assume 3 columns, column1 being the key. >> Column1 Column2 Column3 >> a 1 x >> a 2 x >> a 3 x >> a 4 x >> a 5 x >> a 6 z >> 5 million row for a >> >> a 100 y >> b 9 y >> b 9 y >> b 10 y >> 3 million rows for b >> ... >> more rows >> total rows is 100 million >> >> >> a has 5 million rows.Column2 for a has 1 million unique values. >> b has 3 million rows. Column2 for b has 80 unique values. >> >> Column 3 has just 100s of unique values not in the order of millions, for >> both a and b. >> >> Say totally there are 100 million rows as the input to a UDAF >> aggregation. And the skew in data is for the keys a and b. All other rows >> can be ignored and do not cause any performance issue/ hot partitions. >> >> The code does a dataSet.groupBy("Column1").agg(udaf("Column2", >> "Column3"). >> >> I commented out the UDAF implementation for update and merge methods, so >> essentially the UDAF was doing nothing. >> >> With this code (empty updated and merge for UDAF) the performance for a >> mircro-batch is 16 minutes per micro-batch, micro-batch containing 100 >> million rows, with 5million rows for a and 1 million unique values for >> Column2 for a. >> >> But when I pass empty values for Column2 with nothing else change, >> effectively reducing the 1 million unique values for Column2 to just 1 >> unique value, empty value. The batch processing time goes down to 4 minutes. >> >> So I am trying to understand why is there such a big performance >> difference? What in UDAF causes the processing time to increase in orders >> of magnitude when there is a skew in the data as observed above? >> >> Any insight from spark developers, contributors, or anyone else who has a >> deeper understanding of UDAF would be helpful. >> >> Thanks, >> Bharath >> >> >> >
Re: covert local tsv file to orc file on distributed cloud storage(openstack).
Hi, The source file i have is on local machine and its pretty huge like 150 gb. How to go about it? On Sun, Nov 20, 2016 at 8:52 AM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 19 Nov 2016, at 17:21, vr spark <vrspark...@gmail.com> wrote: > > Hi, > I am looking for scala or python code samples to covert local tsv file to > orc file and store on distributed cloud storage(openstack). > > So, need these 3 samples. Please suggest. > > 1. read tsv > 2. convert to orc > 3. store on distributed cloud storage > > > thanks > VR > > > all options, 9 lines of code, assuming a spark context has already been > setup with the permissions to write to AWS, and the relevant JARs for S3A > to work on the CP. The read operation is inefficient as to determine the > schema it scans the (here, remote) file twice; that may be OK for an > example, but I wouldn't do that in production. The source is a real file > belonging to amazon; dest a bucket of mine. > > More details at: http://www.slideshare.net/steve_l/apache-spark-and- > object-stores > > > val csvdata = spark.read.options(Map( > "header" -> "true", > "ignoreLeadingWhiteSpace" -> "true", > "ignoreTrailingWhiteSpace" -> "true", > "timestampFormat" -> "-MM-dd HH:mm:ss.SSSZZZ", > "inferSchema" -> "true", > "mode" -> "FAILFAST")) > .csv("s3a://landsat-pds/scene_list.gz") > csvdata.write.mode("overwrite").orc("s3a://hwdev-stevel-demo2/landsatOrc") >
Potential memory leak in yarn ApplicationMaster
Hi All, It seems like the heap usage for org.apache.spark.deploy.yarn.ApplicationMaster keeps growing continuously. The driver crashes with OOM eventually. More details: I have a spark streaming app that runs on spark-2.0. The spark.driver.memory is 10G and spark.yarn.driver.memoryOverhead is 2048. Looking at driver heap dumps taken every 30 mins, the heap usage for org.apache.spark.deploy.yarn.ApplicationMaster grows by 100MB every 30 mins. Also, I suspect it may be caused because I had set below to true (which is by default true I think) --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ I am trying out by setting them to false now to check if the heap usage for ApplicationMaster stops increasing. By investigating the heap dump and looking at the code for ApplicationMaster it seems like the heap usage is growing because of releasedExecutorLossReasons HashMap in https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala#L124 Has anyone else seen this issue before? Thanks, Bharath
covert local tsv file to orc file on distributed cloud storage(openstack).
Hi, I am looking for scala or python code samples to covert local tsv file to orc file and store on distributed cloud storage(openstack). So, need these 3 samples. Please suggest. 1. read tsv 2. convert to orc 3. store on distributed cloud storage thanks VR
Re: Performance bug in UDAF?
Trying again. Hoping to find some help in figuring out the performance bottleneck we are observing. Thanks, Bharath On Sun, Oct 30, 2016 at 11:58 AM, Spark User <sparkuser2...@gmail.com> wrote: > Hi All, > > I have a UDAF that seems to perform poorly when its input is skewed. I > have been debugging the UDAF implementation but I don't see any code that > is causing the performance to degrade. More details on the data and the > experiments I have run. > > DataSet: Assume 3 columns, column1 being the key. > Column1 Column2 Column3 > a 1 x > a 2 x > a 3 x > a 4 x > a 5 x > a 6 z > 5 million row for a > > a 100 y > b 9 y > b 9 y > b 10 y > 3 million rows for b > ... > more rows > total rows is 100 million > > > a has 5 million rows.Column2 for a has 1 million unique values. > b has 3 million rows. Column2 for b has 80 unique values. > > Column 3 has just 100s of unique values not in the order of millions, for > both a and b. > > Say totally there are 100 million rows as the input to a UDAF aggregation. > And the skew in data is for the keys a and b. All other rows can be ignored > and do not cause any performance issue/ hot partitions. > > The code does a dataSet.groupBy("Column1").agg(udaf("Column2", > "Column3"). > > I commented out the UDAF implementation for update and merge methods, so > essentially the UDAF was doing nothing. > > With this code (empty updated and merge for UDAF) the performance for a > mircro-batch is 16 minutes per micro-batch, micro-batch containing 100 > million rows, with 5million rows for a and 1 million unique values for > Column2 for a. > > But when I pass empty values for Column2 with nothing else change, > effectively reducing the 1 million unique values for Column2 to just 1 > unique value, empty value. The batch processing time goes down to 4 minutes. > > So I am trying to understand why is there such a big performance > difference? What in UDAF causes the processing time to increase in orders > of magnitude when there is a skew in the data as observed above? > > Any insight from spark developers, contributors, or anyone else who has a > deeper understanding of UDAF would be helpful. > > Thanks, > Bharath > > >
Performance bug in UDAF?
Hi All, I have a UDAF that seems to perform poorly when its input is skewed. I have been debugging the UDAF implementation but I don't see any code that is causing the performance to degrade. More details on the data and the experiments I have run. DataSet: Assume 3 columns, column1 being the key. Column1 Column2 Column3 a 1 x a 2 x a 3 x a 4 x a 5 x a 6 z 5 million row for a a 100 y b 9 y b 9 y b 10 y 3 million rows for b ... more rows total rows is 100 million a has 5 million rows.Column2 for a has 1 million unique values. b has 3 million rows. Column2 for b has 80 unique values. Column 3 has just 100s of unique values not in the order of millions, for both a and b. Say totally there are 100 million rows as the input to a UDAF aggregation. And the skew in data is for the keys a and b. All other rows can be ignored and do not cause any performance issue/ hot partitions. The code does a dataSet.groupBy("Column1").agg(udaf("Column2", "Column3"). I commented out the UDAF implementation for update and merge methods, so essentially the UDAF was doing nothing. With this code (empty updated and merge for UDAF) the performance for a mircro-batch is 16 minutes per micro-batch, micro-batch containing 100 million rows, with 5million rows for a and 1 million unique values for Column2 for a. But when I pass empty values for Column2 with nothing else change, effectively reducing the 1 million unique values for Column2 to just 1 unique value, empty value. The batch processing time goes down to 4 minutes. So I am trying to understand why is there such a big performance difference? What in UDAF causes the processing time to increase in orders of magnitude when there is a skew in the data as observed above? Any insight from spark developers, contributors, or anyone else who has a deeper understanding of UDAF would be helpful. Thanks, Bharath
RDD to Dataset results in fixed number of partitions
Hi All, I'm trying to create a Dataset from RDD and do groupBy on the Dataset. The groupBy stage runs with 200 partitions. Although the RDD had 5000 partitions. I also seem to have no way to change that 200 partitions on the Dataset to some other large number. This seems to be affecting the parallelism as there are 700 executors and only 200 partitions. The code looks somewhat like: val sqsDstream = sparkStreamingContext.union((1 to 3).map(_ => sparkStreamingContext.receiverStream(new SQSReceiver()) ).transform(_.repartition(5000)) sqsDstream.foreachRDD(rdd => { val dataSet = sparkSession.createDataset(rdd) val aggregatedDataset: Dataset[Row] = dataSet.groupBy("primaryKey").agg(udaf("key1")) aggregatedDataset.foreachPartition(partition => { //write to output stream }) }) Any pointers would be appreciated. Thanks, Bharath
receiving stream data options
Hi, I have a continuous rest api stream which keeps spitting out data in form of json. I access the stream using python requests.get(url, stream=True, headers=headers). I want to receive them using spark and do further processing. I am not sure which is best way to receive it in spark. What are the options i have. Some options i can think of 1. push data from rest api stream in to kakfa queue and use spark kafka streaming utilities to capture data and further process. 2. push data from rest api stream to a local socket and use spark socket stream utilities to capture data and further process. 3. is there any other way to receive it? thanks VR
Question about single/multi-pass execution in Spark-2.0 dataset/dataframe
case class Record(keyAttr: String, attr1: String, attr2: String, attr3: String) val ds = sparkSession.createDataset(rdd).as[Record] val attr1Counts = ds.groupBy('keyAttr', 'attr1').count() val attr2Counts = ds.groupBy('keyAttr', 'attr2').count() val attr3Counts = ds.groupBy('keyAttr', 'attr3').count() //similar counts for 20 attributes //code to merge attr1Counts and attr2Counts and attr3Counts //translate it to desired output format and save the result. Some more details: 1) The application is a spark streaming application with batch interval in the order of 5 - 10 mins 2) Data set is large in the order of millions of records per batch 3) I'm using spark 2.0 The above implementation doesn't seem to be efficient at all, if data set goes through the Rows for every count aggregation for computing attr1Counts, attr2Counts and attr3Counts. I'm concerned about the performance. Questions: 1) Does the catalyst optimization handle such queries and does a single pass on the dataset under the hood? 2) Is there a better way to do such aggregations , may be using UDAFs? Or it is better to do RDD.reduceByKey for this use case? RDD.reduceByKey performs well for the data and batch interval of 5 - 10 mins. Not sure if data set implementation as explained above will be equivalent or better. Thanks, Bharath
Re: spark-submit failing but job running from scala ide
Hi Jacek/All, I restarted my terminal and then i try spark-submit and again getting those errors. How do i see how many "runtimes" are running and how to have only one? some how my spark 1.6 and spark 2.0 are conflicting. how to fix it? i installed spark 1.6 earlier using this steps http://genomegeek.blogspot.com/2014/11/how-to-install-apache-spark-on-mac-os-x.html i installed spark 2.0 using these steps http://blog.weetech.co/2015/08/light-learning-apache-spark.html Here is the for run-example m-C02KL0B1FFT4:bin vr$ ./run-example SparkPi Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/09/26 09:11:00 INFO SparkContext: Running Spark version 2.0.0 16/09/26 09:11:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/09/26 09:11:00 INFO SecurityManager: Changing view acls to: vr 16/09/26 09:11:00 INFO SecurityManager: Changing modify acls to: vr 16/09/26 09:11:00 INFO SecurityManager: Changing view acls groups to: 16/09/26 09:11:00 INFO SecurityManager: Changing modify acls groups to: 16/09/26 09:11:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vr); groups with view permissions: Set(); users with modify permissions: Set(vr); groups with modify permissions: Set() 16/09/26 09:11:01 INFO Utils: Successfully started service 'sparkDriver' on port 59323. 16/09/26 09:11:01 INFO SparkEnv: Registering MapOutputTracker 16/09/26 09:11:01 INFO SparkEnv: Registering BlockManagerMaster 16/09/26 09:11:01 INFO DiskBlockManager: Created local directory at /private/var/folders/23/ycbtxh8s551gzlsgj8q647d88gsjgb/T/blockmgr-d0d6dfea-2c97-4337-8e7d-0bbcb141f4c9 16/09/26 09:11:01 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 16/09/26 09:11:01 INFO SparkEnv: Registering OutputCommitCoordinator 16/09/26 09:11:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 16/09/26 09:11:01 INFO Utils: Successfully started service 'SparkUI' on port 4041. 16/09/26 09:11:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.3:4041 16/09/26 09:11:01 INFO SparkContext: Added JAR file:/Users/vr/Downloads/spark-2.0.0/examples/target/scala-2.11/jars/scopt_2.11-3.3.0.jar at spark://192.168.1.3:59323/jars/scopt_2.11-3.3.0.jar with timestamp 1474906261472 16/09/26 09:11:01 INFO SparkContext: Added JAR file:/Users/vr/Downloads/spark-2.0.0/examples/target/scala-2.11/jars/spark-examples_2.11-2.0.0.jar at spark://192.168.1.3:59323/jars/spark-examples_2.11-2.0.0.jar with timestamp 1474906261473 16/09/26 09:11:01 INFO Executor: Starting executor ID driver on host localhost 16/09/26 09:11:01 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59324. 16/09/26 09:11:01 INFO NettyBlockTransferService: Server created on 192.168.1.3:59324 16/09/26 09:11:01 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.3, 59324) 16/09/26 09:11:01 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.3:59324 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.3, 59324) 16/09/26 09:11:01 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.3, 59324) 16/09/26 09:11:01 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect. 16/09/26 09:11:01 INFO SharedState: Warehouse path is 'file:/Users/vr/Downloads/spark-2.0.0/bin/spark-warehouse'. 16/09/26 09:11:01 INFO SparkContext: Starting job: reduce at SparkPi.scala:38 16/09/26 09:11:02 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 2 output partitions 16/09/26 09:11:02 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:38) 16/09/26 09:11:02 INFO DAGScheduler: Parents of final stage: List() 16/09/26 09:11:02 INFO DAGScheduler: Missing parents: List() 16/09/26 09:11:02 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents 16/09/26 09:11:02 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1832.0 B, free 366.3 MB) 16/09/26 09:11:02 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB) 16/09/26 09:11:02 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.3:59324 (size: 1169.0 B, free: 366.3 MB) 16/09/26 09:11:02 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1012 16/09/26 09:11:02 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34) 16/09/26 09:11:02 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 16/09/26 09:11:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5474 bytes) 16/09/26 09:11:02 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, p
Running jobs against remote cluster from scala eclipse ide
Hi, I use scala IDE for eclipse. I usually run job against my local spark installed on my mac and then export the jars and copy it to spark cluster of my company and run spark submit on it. This works fine. But i want to run the jobs from scala ide directly using the spark cluster of my company. the spark master url of my company cluster is spark://spark-437-1-5963003:7077. one of the worker nodes of that cluster is 11.104.29.106 I tried this option, but getting error val conf = new SparkConf().setAppName("Simple Application").setMaster( "spark://spark-437-1-5963003:7077"). set("spark.driver.host","11.104.29.106" ) please let me know. 16/09/25 08:51:51 INFO SparkContext: Running Spark version 2.0.0 16/09/25 08:51:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/09/25 08:51:52 INFO SecurityManager: Changing view acls to: vr 16/09/25 08:51:52 INFO SecurityManager: Changing modify acls to: vr 16/09/25 08:51:52 INFO SecurityManager: Changing view acls groups to: 16/09/25 08:51:52 INFO SecurityManager: Changing modify acls groups to: 16/09/25 08:51:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vr); groups with view permissions: Set(); users with modify permissions: Set(vr); groups with modify permissions: Set() 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 ERROR SparkContext: Error initializing SparkContext. java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries! Consider explicitly setting the appropriate port for the service 'sparkDriver' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries. at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:433) *full class code* object RatingsCounter { /** Our main function where the action happens */ def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.INFO) val conf = new SparkConf().setAppName("Simple Application"). setMaster("spark://spark-437-1-5963003:7077"). set("spark.driver.host", "11.104.29.106") val sc = new SparkContext(conf) val lines = sc.textFile("u.data") val ratings = lines.map(x => x.toString().split("\t")(2)) val results = ratings.countByValue() val sortedResults = results.toSeq.sortBy(_._1) sortedResults.foreach(println) } }
Re: spark-submit failing but job running from scala ide
yes, i have both spark 1.6 and spark 2.0. I unset the spark home environment variable and pointed spark submit to 2.0. Its working now. How do i uninstall/remove spark 1.6 from mac? Thanks On Sun, Sep 25, 2016 at 4:28 AM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > Can you execute run-example SparkPi with your Spark installation? > > Also, see the logs: > > 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port > 4040. Attempting port 4041. > > 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' > on port 4041. > > You've got two Spark runtimes up that may or may not contribute to the > issue. > > Pozdrawiam, > Jacek Laskowski > ---- > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sun, Sep 25, 2016 at 8:36 AM, vr spark <vrspark...@gmail.com> wrote: > > Hi, > > I have this simple scala app which works fine when i run it as scala > > application from the scala IDE for eclipse. > > But when i export is as jar and run it from spark-submit i am getting > below > > error. Please suggest > > > > bin/spark-submit --class com.x.y.vr.spark.first.SimpleApp test.jar > > > > 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port > 4040. > > Attempting port 4041. > > > > 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' on > port > > 4041. > > > > 16/09/24 23:15:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at > > http://192.168.1.3:4041 > > > > 16/09/24 23:15:15 INFO SparkContext: Added JAR > > file:/Users/vr/Downloads/spark-2.0.0/test.jar at > > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 > > > > 16/09/24 23:15:15 INFO Executor: Starting executor ID driver on host > > localhost > > > > 16/09/24 23:15:15 INFO Utils: Successfully started service > > 'org.apache.spark.network.netty.NettyBlockTransferService' on port > 59264. > > > > 16/09/24 23:15:15 INFO NettyBlockTransferService: Server created on > > 192.168.1.3:59264 > > > > 16/09/24 23:15:16 INFO TaskSetManager: Starting task 0.0 in stage 0.0 > (TID > > 0, localhost, partition 0, PROCESS_LOCAL, 5354 bytes) > > > > 16/09/24 23:15:16 INFO TaskSetManager: Starting task 1.0 in stage 0.0 > (TID > > 1, localhost, partition 1, PROCESS_LOCAL, 5354 bytes) > > > > 16/09/24 23:15:16 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > > > > 16/09/24 23:15:16 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > > > > 16/09/24 23:15:16 INFO Executor: Fetching > > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 > > > > 16/09/24 23:16:31 INFO Executor: Fetching > > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 > > > > 16/09/24 23:16:31 ERROR Executor: Exception in task 1.0 in stage 0.0 > (TID 1) > > > > java.io.IOException: Failed to connect to /192.168.1.3:59263 > > > > at > > org.apache.spark.network.client.TransportClientFactory.createClient( > TransportClientFactory.java:228) > > > > at > > org.apache.spark.network.client.TransportClientFactory.createClient( > TransportClientFactory.java:179) > > > > at > > org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient( > NettyRpcEnv.scala:358) > > > > at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel( > NettyRpcEnv.scala:324) > > > > at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:633) > > > > at org.apache.spark.util.Utils$.fetchFile(Utils.scala:459) > > > > at > > org.apache.spark.executor.Executor$$anonfun$org$apache$ > spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:488) > > > > at > > org.apache.spark.executor.Executor$$anonfun$org$apache$ > spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:480) > > > > at > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply( > TraversableLike.scala:733) > > > > at > > scala.collection.mutable.HashMap$$anonfun$foreach$1. > apply(HashMap.scala:99) > > > > at > > scala.collection.mutable.HashMap$$anonfun$foreach$1. > apply(HashMap.scala:99) > > > > at > > scala.collection.mutable.HashTable$class.foreachEntry( > HashTable.scala:230) > > > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > > > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > > > > at &g
spark-submit failing but job running from scala ide
Hi, I have this simple scala app which works fine when i run it as scala application from the scala IDE for eclipse. But when i export is as jar and run it from spark-submit i am getting below error. Please suggest *bin/spark-submit --class com.x.y.vr.spark.first.SimpleApp test.jar* 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' on port 4041. 16/09/24 23:15:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.3:4041 16/09/24 23:15:15 INFO SparkContext: Added JAR file:/Users/vr/Downloads/spark-2.0.0/test.jar at spark:// 192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 16/09/24 23:15:15 INFO Executor: Starting executor ID driver on host localhost 16/09/24 23:15:15 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59264. 16/09/24 23:15:15 INFO NettyBlockTransferService: Server created on 192.168.1.3:59264 16/09/24 23:15:16 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5354 bytes) 16/09/24 23:15:16 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1, PROCESS_LOCAL, 5354 bytes) 16/09/24 23:15:16 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 16/09/24 23:15:16 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 16/09/24 23:15:16 INFO Executor: Fetching spark:// 192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 16/09/24 23:16:31 INFO Executor: Fetching spark:// 192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 16/09/24 23:16:31 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.IOException: Failed to connect to /192.168.1.3:59263 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:358) at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:324) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:633) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:459) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:488) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:480) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:480) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:252) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) *My Scala code* package com.x.y.vr.spark.first /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/Users/vttrich/Downloads/spark-2.0.0/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext("local[*]", "RatingsCounter") //val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
Re: Undefined function json_array_to_map
Hi Ted/All, i did below to get fullstack and see below, not able to understand root cause.. except Exception as error: traceback.print_exc() and this what i get... File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) AnalysisException: u'undefined function json_array_to_map; line 28 pos 73' On Wed, Aug 17, 2016 at 8:59 AM, vr spark <vrspark...@gmail.com> wrote: > spark 1.6.1 > python > > I0817 08:51:59.099356 15189 detector.cpp:481] A new leading master (UPID= > master@10.224.167.25:5050) is detected > I0817 08:51:59.099735 15188 sched.cpp:262] New master detected at > master@x.y.17.25:4550 > I0817 08:51:59.100888 15188 sched.cpp:272] No credentials provided. > Attempting to register without authentication > I0817 08:51:59.326017 15190 sched.cpp:641] Framework registered with > b859f266-9984-482d-8c0d-35bd88c1ad0a-6996 > 16/08/17 08:52:06 WARN ObjectStore: Version information not found in > metastore. hive.metastore.schema.verification is not enabled so recording > the schema version 1.2.0 > 16/08/17 08:52:06 WARN ObjectStore: Failed to get database default, > returning NoSuchObjectException > Traceback (most recent call last): > File "/data1/home/vttrich/spk/orig_qryhubb.py", line 17, in > res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date >= > 408910 limit 10") > File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/context.py", > line 580, in sql > File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", > line 813, in __call__ > File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", > line 51, in deco > pyspark.sql.utils.AnalysisException: u'undefined function > json_array_to_map; line 28 pos 73' > I0817 08:52:12.840224 15600 sched.cpp:1771] Asked to stop the driver > I0817 08:52:12.841198 15189 sched.cpp:1040] Stopping framework > 'b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6326' > > > On Wed, Aug 17, 2016 at 8:50 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Can you show the complete stack trace ? >> >> Which version of Spark are you using ? >> >> Thanks >> >> On Wed, Aug 17, 2016 at 8:46 AM, vr spark <vrspark...@gmail.com> wrote: >> >>> Hi, >>> I am getting error on below scenario. Please suggest. >>> >>> i have a virtual view in hive >>> >>> view name log_data >>> it has 2 columns >>> >>> query_map map<string,string> >>> >>> parti_date int >>> >>> >>> Here is my snippet for the spark data frame >>> >>> my dataframe >>> >>> res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date >= >>> 408910 limit 10") >>> >>> df=res.collect() >>> >>> print 'after collect' >>> >>> print df >>> >>> >>> * File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", >>> line 51, in deco* >>> >>> *pyspark.sql.utils.AnalysisException: u'undefined function >>> json_array_to_map; line 28 pos 73'* >>> >>> >>> >>> >>> >> >
Re: Attempting to accept an unknown offer
My code is very simple, if i use other hive tables, my code works fine. This particular table (virtual view) is huge and might have more metadata. It has only two columns. virtual view name is : cluster_table # col_namedata_type ln string parti int here is snippet... from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext import pyspark.sql import json myconf=SparkConf().setAppName("sql") spcont=SparkContext(conf=myconf) sqlcont=HiveContext(spcont) res=sqlcont.sql("select parti FROM h.cluster_table WHERE parti > 408910 and parti <408911 limit 10") print res.printSchema() print 'res' print res df=res.collect() print 'after collect' print df Here is the ouput after i submit the job I0817 09:18:40.606465 31409 sched.cpp:262] New master detected at master@x.y.17.56:6750 I0817 09:18:40.607461 31409 sched.cpp:272] No credentials provided. Attempting to register without authentication I0817 09:18:40.612763 31409 sched.cpp:641] Framework registered with b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6336 16/08/17 09:18:57 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 16/08/17 09:18:57 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException root |-- parti: integer (nullable = true) None res DataFrame[partition_epoch_hourtenth: int] 2016-08-17 09:19:20,648:31315(0x7fafebfb1700):ZOO_WARN@zookeeper_interest@1557: Exceeded deadline by 19ms 2016-08-17 09:19:30,662:31315(0x7fafebfb1700):ZOO_WARN@zookeeper_interest@1557: Exceeded deadline by 13ms W0817 09:20:01.715824 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676564 W0817 09:20:01.716455 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676630 W0817 09:20:01.716645 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676713 W0817 09:20:01.724409 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676554 W0817 09:20:01.724728 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676555 W0817 09:20:01.724936 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676556 W0817 09:20:01.725126 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676557 W0817 09:20:01.725309 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676558. and many more lines like this on the screen with similar message On Wed, Aug 17, 2016 at 9:08 AM, Ted Yu <yuzhih...@gmail.com> wrote: > Please include user@ in your reply. > > Can you reveal the snippet of hive sql ? > > On Wed, Aug 17, 2016 at 9:04 AM, vr spark <vrspark...@gmail.com> wrote: > >> spark 1.6.1 >> mesos >> job is running for like 10-15 minutes and giving this message and i >> killed it. >> >> In this job, i am creating data frame from a hive sql. There are other >> similar jobs which work fine >> >> On Wed, Aug 17, 2016 at 8:52 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Can you provide more information ? >>> >>> Were you running on YARN ? >>> Which version of Spark are you using ? >>> >>> Was your job failing ? >>> >>> Thanks >>> >>> On Wed, Aug 17, 2016 at 8:46 AM, vr spark <vrspark...@gmail.com> wrote: >>> >>>> >>>> W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492 >>>> >>>> W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493 >>>> >>>> W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494 >>>> >>>> W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495 >>>> >>>> W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496 >>>> >>>> W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497 >>>> >>>> W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to acce
Attempting to accept an unknown offer
W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492 W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493 W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494 W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495 W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496 W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497 W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910498 W0816 23:17:01.985961 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910499 W0816 23:17:01.986121 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910500 2016-08-16 23:18:41,877:16226(0x7f71271b6700):ZOO_WARN@ zookeeper_interest@1557: Exceeded deadline by 13ms 2016-08-16 23:21:12,007:16226(0x7f71271b6700):ZOO_WARN@ zookeeper_interest@1557: Exceeded deadline by 11ms
Undefined function json_array_to_map
Hi, I am getting error on below scenario. Please suggest. i have a virtual view in hive view name log_data it has 2 columns query_map map<string,string> parti_date int Here is my snippet for the spark data frame my dataframe res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date >= 408910 limit 10") df=res.collect() print 'after collect' print df * File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 51, in deco* *pyspark.sql.utils.AnalysisException: u'undefined function json_array_to_map; line 28 pos 73'*
Re: dataframe row list question
Hi Experts, Please suggest On Thu, Aug 11, 2016 at 7:54 AM, vr spark <vrspark...@gmail.com> wrote: > > I have data which is json in this format > > myList: array > |||-- elem: struct > ||||-- nm: string (nullable = true) > ||||-- vList: array (nullable = true) > |||||-- element: string (containsNull = true) > > > from my kafka stream, i created a dataframe using sqlContext.jsonRDD > Then registred it as registerTempTable > selected mylist from this table and i see this output. It is a list of > rows > > [Row(nm=u'Apt', vList=[u'image']), Row(nm=u'Agent', vList=[u'Mozilla/5.0 > ']), Row(nm=u'Ip', vList=[u'xx.yy.106.25'])] > > My requirement is to get only rows with nm='IP' and its corresponding > value > I would need IP, xx.yy.106.25 > > > Please suggest >
dataframe row list question
I have data which is json in this format myList: array |||-- elem: struct ||||-- nm: string (nullable = true) ||||-- vList: array (nullable = true) |||||-- element: string (containsNull = true) from my kafka stream, i created a dataframe using sqlContext.jsonRDD Then registred it as registerTempTable selected mylist from this table and i see this output. It is a list of rows [Row(nm=u'Apt', vList=[u'image']), Row(nm=u'Agent', vList=[u'Mozilla/5.0 ']), Row(nm=u'Ip', vList=[u'xx.yy.106.25'])] My requirement is to get only rows with nm='IP' and its corresponding value I would need IP, xx.yy.106.25 Please suggest
Spark SQL -JDBC connectivity
Hi, I would to know the steps to connect SPARK SQL from spring framework (Web-UI). also how to run and deploy the web application?
Re: read only specific jsons
HI , I tried and getting exception still..any other suggestion? clickDF = cDF.filter(cDF['request.clientIP'].isNotNull()) It fails for some cases and errors our with below message AnalysisException: u'No such struct field clientIP in cookies, nscClientIP1, nscClientIP2, uAgent;' On Tue, Jul 26, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote: > Have you tried filtering out corrupt records with something along the > lines of > > df.filter(df("_corrupt_record").isNull) > > On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote: > > i am reading data from kafka using spark streaming. > > > > I am reading json and creating dataframe. > > I am using pyspark > > > > kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) > > > > lines = kvs.map(lambda x: x[1]) > > > > lines.foreachRDD(mReport) > > > > def mReport(clickRDD): > > > >clickDF = sqlContext.jsonRDD(clickRDD) > > > >clickDF.registerTempTable("clickstream") > > > >PagesDF = sqlContext.sql( > > > > "SELECT request.clientIP as ip " > > > > "FROM clickstream " > > > > "WHERE request.clientIP is not null " > > > > " limit 2000 " > > > > > > The problem is that not all the jsons from the stream have the same > format. > > > > It works when it reads a json which has ip. > > > > Some of the json strings do not have client ip in their schema. > > > > So i am getting error and my job is failing when it encounters such a > json. > > > > How do read only those json which has ip in their schema? > > > > Please suggest. >
read only specific jsons
i am reading data from kafka using spark streaming. I am reading json and creating dataframe. I am using pyspark kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) lines = kvs.map(lambda x: x[1]) lines.foreachRDD(mReport) def mReport(clickRDD): clickDF = sqlContext.jsonRDD(clickRDD) clickDF.registerTempTable("clickstream") PagesDF = sqlContext.sql( "SELECT request.clientIP as ip " "FROM clickstream " "WHERE request.clientIP is not null " " limit 2000 " The problem is that not all the jsons from the stream have the same format. It works when it reads a json which has ip. Some of the json strings do not have client ip in their schema. So i am getting error and my job is failing when it encounters such a json. How do read only those json which has ip in their schema? Please suggest.
read only specific jsons
i am reading data from kafka using spark streaming. I am reading json and creating dataframe. kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) lines = kvs.map(lambda x: x[1]) lines.foreachRDD(mReport) def mReport(clickRDD): clickDF = sqlContext.jsonRDD(clickRDD) clickDF.registerTempTable("clickstream") PagesDF = sqlContext.sql( "SELECT request.clientIP as ip " "FROM clickstream " "WHERE request.clientIP is not null " " limit 2000 " The problem is that not all the jsons from the stream have the same format. It works when it reads a json which has ip. Some of the json strings do not have client ip in their schema. So i am getting error and my job is failing when it encounters such a json. How do read only those json which has ip in their schema? Please suggest.
Error in Word Count Program
val textFile = sc.textFile("README.md")val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark.saveAsTextFile("output1") Same error: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/user/spark-1.5.1-bin-hadoop2.4/bin/README.md
Input path does not exist error in giving input file for word count program
val count = inputfile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _); org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
Re: Getting error in inputfile | inputFile
scala> val count = inputfile.flatMap(line => line.split((" ").map(word => (word,1)).reduceByKey(_ + _) | | You typed two blank lines. Starting a new command. I am getting like how to solve this Regrads, Ramkrishna KT
Getting error in inputfile | inputFile
I am using Spark version is 1.5.1, I am getting errors in first program of spark,ie.e., word count. Please help me to solve this *scala> val inputfile = sc.textFile("input.txt")* *inputfile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at :21* *scala> val counts = inputFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _);* *:19: error: not found: value inputFile* * val counts = inputFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _);* *^*
Unable to Run Spark Streaming Job in Hadoop YARN mode
Hi All, I am unable to run Spark Streaming job in my Hadoop Cluster, its behaving unexpectedly. When i submit a job, it fails by throwing some socket exception in HDFS, if i run the same job second or third time, it runs for sometime and stops. I am confused. Is there any configuration in YARN-Site.xml file specific to spark ??? Please suggest me.
Re: overriding spark.streaming.blockQueueSize default value
Pinging back. Hope someone else has seen this behavior where spark.streaming.blockQueueSize becomes a bottleneck. Is there a suggestion on how to adjust the queue size? Or any documentation on what the effects would be. It seems to be straightforward. But just trying to learn from others experiences. Thanks, On Mon, Mar 28, 2016 at 10:40 PM, Spark Newbie <sparknewbie1...@gmail.com> wrote: > Hi All, > > The default value for spark.streaming.blockQueueSize is 10 in > https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala. > In spark kinesis asl 1.4 the received Kinesis records are stored by calling > addData on line 115 - > https://github.com/apache/spark/blob/branch-1.4/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala#L115 > which pushes one data item to the buffer. This is a problem because, at > application startup, a single Kinesis Worker gains lease for all (or a > majority of) shards for the Kinesis stream. This is by design, KCL load > balances as new Workers are started. But, the single Worker which initially > gains lease for a lot of shards, ends up being blocked on the addData > method, as there will be many KinesisRecordProcessor threads trying to add > the received data to the buffer. The buffer uses a ArrayBlockingQueue > with the size specified in spark.streaming.blockQueueSize which is set to > 10 by default. The > ArrayBlockingQueue is flushed out to memorystore every 100ms. So the > KinesisRecordProcessor threads will be blocked for long period (like upto > an hour) on application startup. The impact is that there will be some > Kinesis shards that don't get consumed by the spark streaming application, > until its KinesisRecordProcessor thread gets unblocked. > > To fix/work around the issue would it be ok to increase the > spark.streaming.blockQueueSize to a larger value. I suppose the main > consideration when increasing this size would be the memory allocated to > the executor. I haven't seen much documentation on this config. And any > advise on how to fine tune this would be useful. > > Thanks, > Spark newbie >
overriding spark.streaming.blockQueueSize default value
Hi All, The default value for spark.streaming.blockQueueSize is 10 in https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala. In spark kinesis asl 1.4 the received Kinesis records are stored by calling addData on line 115 - https://github.com/apache/spark/blob/branch-1.4/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala#L115 which pushes one data item to the buffer. This is a problem because, at application startup, a single Kinesis Worker gains lease for all (or a majority of) shards for the Kinesis stream. This is by design, KCL load balances as new Workers are started. But, the single Worker which initially gains lease for a lot of shards, ends up being blocked on the addData method, as there will be many KinesisRecordProcessor threads trying to add the received data to the buffer. The buffer uses a ArrayBlockingQueue with the size specified in spark.streaming.blockQueueSize which is set to 10 by default. The ArrayBlockingQueue is flushed out to memorystore every 100ms. So the KinesisRecordProcessor threads will be blocked for long period (like upto an hour) on application startup. The impact is that there will be some Kinesis shards that don't get consumed by the spark streaming application, until its KinesisRecordProcessor thread gets unblocked. To fix/work around the issue would it be ok to increase the spark.streaming.blockQueueSize to a larger value. I suppose the main consideration when increasing this size would be the memory allocated to the executor. I haven't seen much documentation on this config. And any advise on how to fine tune this would be useful. Thanks, Spark newbie
Issues facing while Running Spark Streaming Job in YARN cluster mode
Hi , I am able to run spark streaming job in local mode, when i try to run the same job in my YARN cluster, its throwing errors. Any help is appreciated in this regard Here are my Exception logs: Exception 1: java.net.SocketTimeoutException: 48 millis timeout while waiting for channel to be ready for write. ch : java.nio.channels.SocketChannel[connected local=/172.16.28.192:50010 remote=/172.16.28.193:46147] at org.apache.hadoop.net.SocketIOWithTimeout.waitForIO(SocketIOWithTimeout.java:246) at org.apache.hadoop.net.SocketOutputStream.waitForWritable(SocketOutputStream.java:172) at org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:220) at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:559) at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:728) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:496) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opReadBlock(Receiver.java:116) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235) at java.lang.Thread.run(Thread.java:745) Exception 2: 2016-03-22 12:17:47,838 WARN org.apache.hadoop.hdfs.BlockReaderFactory: I/O error constructing remote block reader. java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:658) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530) at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101) at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576) at org.apache.hadoop.hdfs.DFSInputStream.seekToBlockSource(DFSInputStream.java:1460) at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:773) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:806) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:847) at java.io.DataInputStream.read(DataInputStream.java:100) at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:84) at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:52) at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:112) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366) at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:265) at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2016-03-22 12:17:47,838 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1458629096860_0001_01_01 transitioned from KILLING to DONE 2016-03-22 12:17:47,841 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application: Removing container_1458629096860_0001_01_01 from application application_1458629096860_0001 2016-03-22 12:17:47,842 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got event CONTAINER_STOP for appId application_1458629096860_0001 2016-03-22 12:17:47,842 WARN org.apache.hadoop.hdfs.DFSClient: Failed to connect to /node1:50010 for block, add to deadNodes and continue. java.nio.channels.ClosedByInterruptException java.nio.channels.ClosedByInterruptException
How to Catch Spark Streaming Twitter Exception ( Written Java)
Dear All, I am facing problem with Spark Twitter Streaming code, When ever twitter4j throws exception, i am unable to catch that exception. Could anyone help me catching that exception. Here is Pseudo Code: SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("Test"); //SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("Test"); // SparkTwitterStreaming sss = new SparkTwitterStreaming(); final int batchIntervalInSec = 60; // choose long interval to reduce // num // files created Duration batchInterval = Durations.seconds(batchIntervalInSec); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchInterval); try{ JavaReceiverInputDStream receiverStream = null; receiverStream = TwitterUtils.createStream(jssc, a2, params); JavaDStream tweets = receiverStream.map(new Function<Status, String>() { public String call(Status tweet) throws Exception { String json = ""; Gson gson = new Gson(); jssc.start(); jssc.awaitTermination(); } I have no issue with streaming the twitter data, When ever twitter account had expired, i have to catch this exception and do work around for this exception. Here is the exception trace: INFO spark.streaming.receiver.BlockGenerator - Started BlockGenerator INFO spark.streaming.scheduler.ReceiverTracker - Registered receiver for stream 0 from 172.16.28.183:34829 INFO spark.streaming.receiver.ReceiverSupervisorImpl - Starting receiver INFO spark.streaming.twitter.TwitterReceiver - Twitter receiver started INFO spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver onStart INFO spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped INFO twitter4j.TwitterStreamImpl - Establishing connection. INFO twitter4j.TwitterStreamImpl - 401:Authentication credentials ( https://dev.twitter.com/pages/auth) were missing or incorrect. Ensure that you have set valid consumer key/secret, access token/secret, and the system clock is in sync. \n\n\nError 401 Unauthorized HTTP ERROR: 401 Problem accessing '/1.1/statuses/filter.json'. Reason: Unauthorized INFO twitter4j.TwitterStreamImpl - Waiting for 1 milliseconds WARN spark.streaming.receiver.ReceiverSupervisorImpl - Restarting receiver with delay 2000 ms: Error receiving tweets 401:Authentication credentials (https://dev.twitter.com/pages/auth) were missing or incorrect. Ensure that you have set valid consumer key/secret, access token/secret, and the system clock is in sync. \n\n\nError 401 Unauthorized HTTP ERROR: 401 Problem accessing '/1.1/statuses/filter.json'. Reason: Unauthorized Relevant discussions can be found on the Internet at: http://www.google.co.jp/search?q=944a924a or http://www.google.co.jp/search?q=24fd66dc TwitterException{exceptionCode=[944a924a-24fd66dc], statusCode=401, message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=3.0.3} at twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:177) at twitter4j.internal.http.HttpClientWrapper.request(HttpClientWrapper.java:61) at twitter4j.internal.http.HttpClientWrapper.post(HttpClientWrapper.java:98) at twitter4j.TwitterStreamImpl.getFilterStream(TwitterStreamImpl.java:304) at twitter4j.TwitterStreamImpl$7.getStream(TwitterStreamImpl.java:292) at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:462) Please let me know if you need some more clarity my question. Thanks, Sony.
Terminate Spark job in eclipse
Hi Friends, Anyone can help me about how to terminate the Spark job in eclipse using java code? Thanks Soniya
Spark Twitter streaming
Hallo friends, I need a urgent help. I am using spark streaming to get the tweets from twitter and loading the data into HDFS. I want to find out the tweet source whether it is from web or mobile web or facebook ..etc. could you please help me logic. Thanks Soniya
Re: spark job submisson on yarn-cluster mode failing
Hi, I am facing below error msg now. please help me. 2016-01-21 16:06:14,123 WARN org.apache.hadoop.hdfs.DFSClient: Failed to connect to /xxx.xx.xx.xx:50010 for block, add to deadNodes and continue. java.nio.channels.ClosedByInterruptException java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:658) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530) at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101) at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576) at org.apache.hadoop.hdfs.DFSInputStream.seekToBlockSource(DFSInputStream.java:1460) at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:773) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:806) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:847) at java.io.DataInputStream.read(DataInputStream.java:100) at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:84) at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:52) at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:112) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366) at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:265) at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks Soniya On Thu, Jan 21, 2016 at 5:42 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Please also check AppMaster log. > > Thanks > > On Jan 21, 2016, at 3:51 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > > Can you look in the executor logs and see why the sparkcontext is being > shutdown? Similar discussion happened here previously. > http://apache-spark-user-list.1001560.n3.nabble.com/RECEIVED-SIGNAL-15-SIGTERM-td23668.html > > Thanks > Best Regards > > On Thu, Jan 21, 2016 at 5:11 PM, Soni spark <soni2015.sp...@gmail.com> > wrote: > >> Hi Friends, >> >> I spark job is successfully running on local mode but failing on cluster >> mode. Below is the error message i am getting. anyone can help me. >> >> >> >> 16/01/21 16:38:07 INFO twitter4j.TwitterStreamImpl: Establishing connection. >> 16/01/21 16:38:07 INFO twitter.TwitterReceiver: Twitter receiver started >> 16/01/21 16:38:07 INFO receiver.ReceiverSupervisorImpl: Called receiver >> onStart >> 16/01/21 16:38:07 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver >> to be stopped*16/01/21 16:38:10 ERROR yarn.ApplicationMaster: RECEIVED >> SIGNAL 15: SIGTERM* >> 16/01/21 16:38:10 INFO streaming.StreamingContext: Invoking >> stop(stopGracefully=false) from shutdown hook >> 16/01/21 16:38:10 INFO scheduler.ReceiverTracker: Sent stop signal to all 1 >> receivers >> 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Received stop signal >> 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Stopping receiver >> with message: Stopped by driver: >> 16/01/21 16:38:10 INFO twitter.TwitterReceiver: Twitter receiver stopped >> 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Called receiver >> onStop >> 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Deregistering >> receiver 0*16/01/21 16:38:10 ERROR scheduler.ReceiverTracker: Deregistered >> receiver for stream 0: Stopped by driver* >> 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0 >> 16/01/21 16:38:10 INFO receiver.BlockGenerator: Stopping BlockGenerator >> 16/01/21 16:38:10 INFO yarn.ApplicationMaster: Waiting for spark context >> initialization ... >> >> Thanks >> >> Soniya >> >> >
spark job submisson on yarn-cluster mode failing
Hi Friends, I spark job is successfully running on local mode but failing on cluster mode. Below is the error message i am getting. anyone can help me. 16/01/21 16:38:07 INFO twitter4j.TwitterStreamImpl: Establishing connection. 16/01/21 16:38:07 INFO twitter.TwitterReceiver: Twitter receiver started 16/01/21 16:38:07 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart 16/01/21 16:38:07 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped*16/01/21 16:38:10 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM* 16/01/21 16:38:10 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 16/01/21 16:38:10 INFO scheduler.ReceiverTracker: Sent stop signal to all 1 receivers 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Received stop signal 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver: 16/01/21 16:38:10 INFO twitter.TwitterReceiver: Twitter receiver stopped 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0*16/01/21 16:38:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver* 16/01/21 16:38:10 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0 16/01/21 16:38:10 INFO receiver.BlockGenerator: Stopping BlockGenerator 16/01/21 16:38:10 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... Thanks Soniya
Re: ClassNotFoundException when executing spark jobs in standalone/cluster mode on Spark 1.5.2
you need make sure this class is accessible to all servers since its a cluster mode and drive can be on any of the worker nodes. On Fri, Dec 25, 2015 at 5:57 PM, Saiph Kappa <saiph.ka...@gmail.com> wrote: > Hi, > > I'm submitting a spark job like this: > > ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class Benchmark --master >> spark://machine1:6066 --deploy-mode cluster --jars >> target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar >> /home/user/bench/target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar 1 >> machine2 1000 >> > > and in the driver stderr, I get the following exception: > > WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 74, XXX.XXX.XX.XXX): >> java.lang.ClassNotFoundException: Benchmark$$anonfun$main$1 >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:270) >> at >> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >> at >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) >> at >> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> > > Note that everything works fine when using deploy-mode as 'client'. > This is the application that I'm trying to run: > https://github.com/tdas/spark-streaming-benchmark (this problem also > happens for non streaming applications) > > What can I do to sort this out? > > Thanks. >
Re: why one of Stage is into Skipped section instead of Completed
Thank you Silvio for the update. On Sat, Dec 26, 2015 at 1:14 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Skipped stages result from existing shuffle output of a stage when > re-running a transformation. The executors will have the output of the > stage in their local dirs and Spark recognizes that, so rather than > re-computing, it will start from the following stage. So, this is a good > thing in that you’re not re-computing a stage. In your case, it looks like > there’s already the output of the userreqs RDD (reduceByKey) so it doesn’t > re-compute it. > > From: Prem Spark <sparksure...@gmail.com> > Date: Friday, December 25, 2015 at 11:41 PM > To: "user@spark.apache.org" <user@spark.apache.org> > Subject: why one of Stage is into Skipped section instead of Completed > > > Whats does the below Skipped Stage means. can anyone help in clarifying? > I was expecting 3 stages to get Succeeded but only 2 of them getting > completed while one is skipped. > Status: SUCCEEDED > Completed Stages: 2 > Skipped Stages: 1 > > Scala REPL Code Used: > > accounts is a basic RDD contains weblog text data. > > var accountsByID = accounts. > > map(line => line.split(',')). > > map(values => (values(0),values(4)+','+values(3))); > > var userreqs = sc. > > textFile("/loudacre/weblogs/*6"). > > map(line => line.split(' ')). > > map(words => (words(2),1)). > > reduceByKey((v1,v2) => v1 + v2); > > var accounthits = > > accountsByID.join(userreqs).map(pair => pair._2) > > accounthits. > > saveAsTextFile("/loudacre/userreqs") > > scala> accounthits.toDebugString > res15: String = > (32) MapPartitionsRDD[24] at map at :28 [] > | MapPartitionsRDD[23] at join at :28 [] > | MapPartitionsRDD[22] at join at :28 [] > | CoGroupedRDD[21] at join at :28 [] > +-(15) MapPartitionsRDD[15] at map at :25 [] > | | MapPartitionsRDD[14] at map at :24 [] > | | /loudacre/accounts/* MapPartitionsRDD[13] at textFile at > :21 [] > | | /loudacre/accounts/* HadoopRDD[12] at textFile at :21 [] > | ShuffledRDD[20] at reduceByKey at :25 [] > +-(32) MapPartitionsRDD[19] at map at :24 [] > | MapPartitionsRDD[18] at map at :23 [] > | /loudacre/weblogs/*6 MapPartitionsRDD[17] at textFile at > :22 [] > | /loudacre/weblogs/*6 HadoopRDD[16] at textFile at > > > > > > >
Can anyone explain Spark behavior for below? Kudos in Advance
Scenario1: val z = sc.parallelize(List("12","23","345",""),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res143: String = 10 Scenario2: val z = sc.parallelize(List("12","23","","345"),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res144: String = 11 why the result is different . I was expecting 10 for both. also for the first Partition
why one of Stage is into Skipped section instead of Completed
Whats does the below Skipped Stage means. can anyone help in clarifying? I was expecting 3 stages to get Succeeded but only 2 of them getting completed while one is skipped. Status: SUCCEEDED Completed Stages: 2 Skipped Stages: 1 Scala REPL Code Used: accounts is a basic RDD contains weblog text data. var accountsByID = accounts. map(line => line.split(',')). map(values => (values(0),values(4)+','+values(3))); var userreqs = sc. textFile("/loudacre/weblogs/*6"). map(line => line.split(' ')). map(words => (words(2),1)). reduceByKey((v1,v2) => v1 + v2); var accounthits = accountsByID.join(userreqs).map(pair => pair._2) accounthits. saveAsTextFile("/loudacre/userreqs") scala> accounthits.toDebugString res15: String = (32) MapPartitionsRDD[24] at map at :28 [] | MapPartitionsRDD[23] at join at :28 [] | MapPartitionsRDD[22] at join at :28 [] | CoGroupedRDD[21] at join at :28 [] +-(15) MapPartitionsRDD[15] at map at :25 [] | | MapPartitionsRDD[14] at map at :24 [] | | /loudacre/accounts/* MapPartitionsRDD[13] at textFile at :21 [] | | /loudacre/accounts/* HadoopRDD[12] at textFile at :21 [] | ShuffledRDD[20] at reduceByKey at :25 [] +-(32) MapPartitionsRDD[19] at map at :24 [] | MapPartitionsRDD[18] at map at :23 [] | /loudacre/weblogs/*6 MapPartitionsRDD[17] at textFile at :22 [] | /loudacre/weblogs/*6 HadoopRDD[16] at textFile at
Unable to create hive table using HiveContext
Hi friends, I am trying to create hive table through spark with Java code in Eclipse using below code. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc()); sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); but i am getting error RROR XBM0J: Directory /home/workspace4/Test/metastore_db already exists. I am not sure why metastore creating in workspace. Please help me. Thanks Soniya
create hive table in Spark with Java code
Hi Friends, I have created a hive external table with partition. I want to alter the hive table partition through spark with java code. alter table table1 add if not exists partition(datetime='2015-12-01') location 'hdfs://localhost:54310/spark/twitter/datetime=2015-12-01/' The above query i am executing it manually. i want to execute it through Spark with Java code. Please help me Thanks Soniya
How do I link JavaEsSpark.saveToEs() to a sparkConf?
Folks, I have the following program : SparkConf conf = new SparkConf().setMaster("local").setAppName("Indexer").set("spark.driver.maxResultSize", "2g");conf.set("es.index.auto.create", "true");conf.set("es.nodes", "localhost");conf.set("es.port", "9200");conf.set("es.write.operation", "index");JavaSparkContext sc = new JavaSparkContext(conf); . . JavaEsSpark.saveToEs(filteredFields, "foo"); I get an error saying cannot find storage. Looks like the driver program cannot the Elastic Search Server. Seeing the program, I have not associated JavaEsSpark to the SparkConf. Question: How do I associate JavaEsSpark to SparkConf?
epoch date time problem to load data into in spark
Hi Friends, I am written a spark streaming program in Java to access twitter tweets and it is working fine. I can able to copy the twitter feeds to HDFS location by batch wise.For each batch, it is creating a folder with epoch time stamp. for example, If i give HDFS location as *hdfs://localhost:54310/twitter/*, the files are creating like below */spark/twitter/-144958080//spark/twitter/-144957984/* I want to create a folder name like -MM-dd-HH format instead of by default epoch format. I want it like below so that i can do hive partitions easily to access the data. */spark/twitter/2015-12-08-01/* Any one can help me. Thank you so much in advance. Thanks Soniya
Re: SparkException: Failed to get broadcast_10_piece0
Pinging again ... On Wed, Nov 25, 2015 at 4:19 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Which Spark release are you using ? > > Please take a look at: > https://issues.apache.org/jira/browse/SPARK-5594 > > Cheers > > On Wed, Nov 25, 2015 at 3:59 PM, Spark Newbie <sparknewbie1...@gmail.com> > wrote: > >> Hi Spark users, >> >> I'm seeing the below exceptions once in a while which causes tasks to >> fail (even after retries, so it is a non recoverable exception I think), >> hence stage fails and then the job gets aborted. >> >> Exception --- >> java.io.IOException: org.apache.spark.SparkException: Failed to get >> broadcast_10_piece0 of broadcast_10 >> >> Any idea why this exception occurs and how to avoid/handle these >> exceptions? Please let me know if you have seen this exception and know a >> fix for it. >> >> Thanks, >> Bharath >> > >
Error in block pushing thread puts the KinesisReceiver in a stuck state
Hi Spark users, I have been seeing this issue where receivers enter a "stuck" state after it encounters a the following exception "Error in block pushing thread - java.util.concurrent.TimeoutException: Futures timed out". I am running the application on spark-1.4.1 and using kinesis-asl-1.4. When this happens, the observation is that the Kinesis.ProcessTask.shard.MillisBehindLatest metric does not get published anymore, when I look at cloudwatch, which indicates that the workers associated with the receiver are not checkpointing any more for the shards that they were reading from. This seems like a bug in to BlockGenerator code , here - https://github.com/apache/spark/blob/branch-1.4/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala#L171 when pushBlock encounters an exception, in this case the TimeoutException, it stops pushing blocks. Is this really expected behavior? Has anyone else seen this error and have you also seen the issue where receivers stop receiving records? I'm also trying to find the root cause for the TimeoutException. If anyone has an idea on this please share. Thanks, Bharath
SparkException: Failed to get broadcast_10_piece0
Hi Spark users, I'm seeing the below exceptions once in a while which causes tasks to fail (even after retries, so it is a non recoverable exception I think), hence stage fails and then the job gets aborted. Exception --- java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_10_piece0 of broadcast_10 Any idea why this exception occurs and how to avoid/handle these exceptions? Please let me know if you have seen this exception and know a fix for it. Thanks, Bharath
Re: SparkException: Failed to get broadcast_10_piece0
Using Spark-1.4.1 On Wed, Nov 25, 2015 at 4:19 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Which Spark release are you using ? > > Please take a look at: > https://issues.apache.org/jira/browse/SPARK-5594 > > Cheers > > On Wed, Nov 25, 2015 at 3:59 PM, Spark Newbie <sparknewbie1...@gmail.com> > wrote: > >> Hi Spark users, >> >> I'm seeing the below exceptions once in a while which causes tasks to >> fail (even after retries, so it is a non recoverable exception I think), >> hence stage fails and then the job gets aborted. >> >> Exception --- >> java.io.IOException: org.apache.spark.SparkException: Failed to get >> broadcast_10_piece0 of broadcast_10 >> >> Any idea why this exception occurs and how to avoid/handle these >> exceptions? Please let me know if you have seen this exception and know a >> fix for it. >> >> Thanks, >> Bharath >> > >
Spark twitter streaming in Java
Dear Friends, I am struggling with spark twitter streaming. I am not getting any data. Please correct below code if you found any mistakes. import org.apache.spark.*; import org.apache.spark.api.java. function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.twitter.*; import twitter4j.GeoLocation; import twitter4j.Status; import java.util.Arrays; import scala.Tuple2; public class SparkTwitterStreaming { public static void main(String[] args) { final String consumerKey = "XXX"; final String consumerSecret = "XX"; final String accessToken = "XX"; final String accessTokenSecret = "XXX"; SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkTwitterStreaming"); JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(6)); System.setProperty("twitter4j.oauth.consumerKey", consumerKey); System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret); System.setProperty("twitter4j.oauth.accessToken", accessToken); System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret); String[] filters = new String[] {"Narendra Modi"}; JavaReceiverInputDStream twitterStream = TwitterUtils.createStream(jssc,filters); // Without filter: Output text of all tweets JavaDStream statuses = twitterStream.map( new Function<Status, String>() { public String call(Status status) { return status.getText(); } } ); statuses.print(); statuses.dstream().saveAsTextFiles("/home/apache/tweets", "txt"); } }
Re: s3a file system and spark deployment mode
Are you using EMR? You can install Hadoop-2.6.0 along with Spark-1.5.1 in your EMR cluster. And that brings s3a jars to the worker nodes and it becomes available to your application. On Thu, Oct 15, 2015 at 11:04 AM, Scott Reynolds <sreyno...@twilio.com> wrote: > List, > > Right now we build our spark jobs with the s3a hadoop client. We do this > because our machines are only allowed to use IAM access to the s3 store. We > can build our jars with the s3a filesystem and the aws sdk just fine and > this jars run great in *client mode*. > > We would like to move from client mode to cluster mode as that will allow > us to be more resilient to driver failure. In order to do this either: > 1. the jar file has to be on worker's local disk > 2. the jar file is in shared storage (s3a) > > We would like to put the jar file in s3 storage, but when we give the jar > path as s3a://.., the worker node doesn't have the hadoop s3a and aws > sdk in its classpath / uber jar. > > Other then building spark with those two dependencies, what other options > do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a thing. > > Need to get s3a access to both the master (so that we can log spark event > log to s3) and to the worker processes (driver, executor). > > Looking for ideas before just adding the dependencies to our spark build > and calling it a day. >
Re: Spark 1.5 java.net.ConnectException: Connection refused
What is the best way to fail the application when job gets aborted? On Wed, Oct 14, 2015 at 1:27 PM, Tathagata Das <t...@databricks.com> wrote: > When a job gets aborted, it means that the internal tasks were retried a > number of times before the system gave up. You can control the number > retries (see Spark's configuration page). The job by default does not get > resubmitted. > > You could try getting the logs of the failed executor, to see what caused > the failure. Could be a memory limit issue, and YARN killing it somehow. > > > > On Wed, Oct 14, 2015 at 11:05 AM, Spark Newbie <sparknewbie1...@gmail.com> > wrote: > >> Is it slowing things down or blocking progress. >> >> I didn't see slowing of processing, but I do see jobs aborted >> consecutively for a period of 18 batches (5 minute batch intervals). So I >> am worried about what happened to the records that these jobs were >> processing. >> Also, one more thing to mention is that the >> StreamingListenerBatchCompleted.numRecords information shows all >> received records as processed even if the batch/job failed. The processing >> time as well shows as the same time it takes for a successful batch. >> It seems like it is the numRecords which was the input to the batch >> regardless of whether they were successfully processed or not. >> >> On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie <sparknewbie1...@gmail.com >> > wrote: >> >>> I ran 2 different spark 1.5 clusters that have been running for more >>> than a day now. I do see jobs getting aborted due to task retry's maxing >>> out (default 4) due to ConnectionException. It seems like the executors die >>> and get restarted and I was unable to find the root cause (same app code >>> and conf used on spark 1.4.1 I don't see ConnectionException). >>> >>> Another question related to this, what happens to the kinesis records >>> received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I >>> am using) does the job gets resubmitted with the same received records? Or >>> does the kinesis-asl library get those records again based on sequence >>> numbers it tracks? It would good for me to understand the story around >>> lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when >>> jobs are aborted. Any pointers or quick explanation would be very helpful. >>> >>> >>> On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Is this happening too often? Is it slowing things down or blocking >>>> progress. Failures once in a while is part of the norm, and the system >>>> should take care of itself. >>>> >>>> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie < >>>> sparknewbie1...@gmail.com> wrote: >>>> >>>>> Hi Spark users, >>>>> >>>>> I'm seeing the below exception in my spark streaming application. It >>>>> happens in the first stage where the kinesis receivers receive records and >>>>> perform a flatMap operation on the unioned Dstream. A coalesce step also >>>>> happens as a part of that stage for optimizing the performance. >>>>> >>>>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When >>>>> I look at the executor logs I do not see any exceptions indicating the >>>>> root >>>>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did >>>>> that service go down. >>>>> >>>>> Any help debugging this problem will be helpful. >>>>> >>>>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while >>>>> beginning fetch of 1 outstanding blocks >>>>> java.io.IOException: Failed to connect to >>>>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 >>>>> at >>>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) >>>>> at >>>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) >>>>> at >>>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) >>>>> at >>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >>>>> at >>>>> org
Re: Spark 1.5 java.net.ConnectException: Connection refused
Is it slowing things down or blocking progress. >> I didn't see slowing of processing, but I do see jobs aborted consecutively for a period of 18 batches (5 minute batch intervals). So I am worried about what happened to the records that these jobs were processing. Also, one more thing to mention is that the StreamingListenerBatchCompleted.numRecords information shows all received records as processed even if the batch/job failed. The processing time as well shows as the same time it takes for a successful batch. It seems like it is the numRecords which was the input to the batch regardless of whether they were successfully processed or not. On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie <sparknewbie1...@gmail.com> wrote: > I ran 2 different spark 1.5 clusters that have been running for more than > a day now. I do see jobs getting aborted due to task retry's maxing out > (default 4) due to ConnectionException. It seems like the executors die and > get restarted and I was unable to find the root cause (same app code and > conf used on spark 1.4.1 I don't see ConnectionException). > > Another question related to this, what happens to the kinesis records > received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I > am using) does the job gets resubmitted with the same received records? Or > does the kinesis-asl library get those records again based on sequence > numbers it tracks? It would good for me to understand the story around > lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when > jobs are aborted. Any pointers or quick explanation would be very helpful. > > > On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com> > wrote: > >> Is this happening too often? Is it slowing things down or blocking >> progress. Failures once in a while is part of the norm, and the system >> should take care of itself. >> >> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <sparknewbie1...@gmail.com> >> wrote: >> >>> Hi Spark users, >>> >>> I'm seeing the below exception in my spark streaming application. It >>> happens in the first stage where the kinesis receivers receive records and >>> perform a flatMap operation on the unioned Dstream. A coalesce step also >>> happens as a part of that stage for optimizing the performance. >>> >>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I >>> look at the executor logs I do not see any exceptions indicating the root >>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did >>> that service go down. >>> >>> Any help debugging this problem will be helpful. >>> >>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while >>> beginning fetch of 1 outstanding blocks >>> java.io.IOException: Failed to connect to >>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 >>> at >>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) >>> at >>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) >>> at >>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) >>> at >>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >>> at >>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) >>> at >>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) >>> at >>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593) >>> at >>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579) >>> at >>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623) >>> at >>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44
Re: Spark 1.5 java.net.ConnectException: Connection refused
I ran 2 different spark 1.5 clusters that have been running for more than a day now. I do see jobs getting aborted due to task retry's maxing out (default 4) due to ConnectionException. It seems like the executors die and get restarted and I was unable to find the root cause (same app code and conf used on spark 1.4.1 I don't see ConnectionException). Another question related to this, what happens to the kinesis records received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I am using) does the job gets resubmitted with the same received records? Or does the kinesis-asl library get those records again based on sequence numbers it tracks? It would good for me to understand the story around lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when jobs are aborted. Any pointers or quick explanation would be very helpful. On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com> wrote: > Is this happening too often? Is it slowing things down or blocking > progress. Failures once in a while is part of the norm, and the system > should take care of itself. > > On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <sparknewbie1...@gmail.com> > wrote: > >> Hi Spark users, >> >> I'm seeing the below exception in my spark streaming application. It >> happens in the first stage where the kinesis receivers receive records and >> perform a flatMap operation on the unioned Dstream. A coalesce step also >> happens as a part of that stage for optimizing the performance. >> >> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I >> look at the executor logs I do not see any exceptions indicating the root >> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did >> that service go down. >> >> Any help debugging this problem will be helpful. >> >> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while >> beginning fetch of 1 outstanding blocks >> java.io.IOException: Failed to connect to >> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) >> at >> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) >> at >> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >> at >> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) >> at >> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) >> at >> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) >> at >> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593) >> at >> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579) >> at >> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623) >> at >> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >> at >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139) >> at >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135) >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> at >> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>
Spark 1.5 java.net.ConnectException: Connection refused
Hi Spark users, I'm seeing the below exception in my spark streaming application. It happens in the first stage where the kinesis receivers receive records and perform a flatMap operation on the unioned Dstream. A coalesce step also happens as a part of that stage for optimizing the performance. This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I look at the executor logs I do not see any exceptions indicating the root cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did that service go down. Any help debugging this problem will be helpful. 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593) at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:623) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused: ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 Thanks, Bharath
DEBUG level log in receivers and executors
Hi Spark users, Is there an easy way to turn on DEBUG logs in receivers and executors? Setting sparkContext.setLogLevel seems to turn on DEBUG level only on the Driver. Thanks,
Re: Spark checkpoint restore failure due to s3 consistency issue
Unfortunately I don't have the before stop logs anymore since the log was overwritten in my next run. I created a rdd-_$folder$ file in S3 which was missing compared to the other rdd- checkpointed. The app started without the IllegalArgumentException. Do you still need to after restart log4j logs? I can send it if that will help dig into the root cause. On Fri, Oct 9, 2015 at 2:18 PM, Tathagata Das <t...@databricks.com> wrote: > Can you provide the before stop and after restart log4j logs for this? > > On Fri, Oct 9, 2015 at 2:13 PM, Spark Newbie <sparknewbie1...@gmail.com> > wrote: > >> Hi Spark Users, >> >> I'm seeing checkpoint restore failures causing the application startup to >> fail with the below exception. When I do "ls" on the s3 path I see the key >> listed sometimes and not listed sometimes. There are no part files >> (checkpointed files) in the specified S3 path. This is possible because I >> killed the app and restarted as a part of my testing to see if kinesis-asl >> library's implementation of lossless kinesis receivers work. >> >> Has anyone seen the below exception before? If so is there a recommended >> way to handle this case? >> >> 15/10/09 21:02:09 DEBUG S3NativeFileSystem: getFileStatus could not find >> key '' >> Exception in thread "main" java.lang.IllegalArgumentException: >> requirement failed: Checkpoint directory does not exist: > path to the checkpointed rdd> >> at scala.Predef$.require(Predef.scala:233) >> at >> org.apache.spark.rdd.ReliableCheckpointRDD.(ReliableCheckpointRDD.scala:45) >> at >> org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218) >> at >> org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) >> at org.apache.spark.SparkContext.withScope(SparkContext.scala:700) >> at >> org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:1217) >> at >> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112) >> at >> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109) >> at >> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) >> at >> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) >> at >> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) >> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) >> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) >> at >> org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:487) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153) >>
Spark checkpoint restore failure due to s3 consistency issue
Hi Spark Users, I'm seeing checkpoint restore failures causing the application startup to fail with the below exception. When I do "ls" on the s3 path I see the key listed sometimes and not listed sometimes. There are no part files (checkpointed files) in the specified S3 path. This is possible because I killed the app and restarted as a part of my testing to see if kinesis-asl library's implementation of lossless kinesis receivers work. Has anyone seen the below exception before? If so is there a recommended way to handle this case? 15/10/09 21:02:09 DEBUG S3NativeFileSystem: getFileStatus could not find key '' Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: at scala.Predef$.require(Predef.scala:233) at org.apache.spark.rdd.ReliableCheckpointRDD.(ReliableCheckpointRDD.scala:45) at org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218) at org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.SparkContext.withScope(SparkContext.scala:700) at org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:1217) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:487) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:153) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:158) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:837) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:837) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:837) at foo$.getStreamingContext(foo.scala:72) Thanks, Bharath
Getting an error when trying to read a GZIPPED file
Folks, I have an input file which is gzipped. I use sc.textFile("foo.gz") when I see the following problem. Can someone help me how to fix this? 15/09/03 10:05:32 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id15/09/03 10:05:32 INFO CodecPool: Got brand-new decompressor [.gz]15/09/03 10:06:15 WARN MemoryStore: Not enough space to cache rdd_2_0 in memory! (computed 216.3 MB so far)15/09/03 10:06:15 INFO MemoryStore: Memory use = 156.2 KB (blocks) + 213.1 MB (scratch space shared across 1 thread(s)) = 213.3 MB. Storage limit = 265.1 MB.
Data Frame support CSV or excel format ?
Hi all , Can we create data frame from excels sheet or csv file , in below example It seems they support only json ? DataFrame df = sqlContext.read().json(examples/src/main/resources/people.json);
Spark
I was running a Spark Job to crunch a 9GB apache log file When I saw the following error: 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal): ExecutorLostFailure (executor 29 lost)15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 40), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 86), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 84), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 22), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 48), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 12), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59)15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove executor 29 from BlockManagerMaster.15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(29, ip-10-150-137-100.ap-southeast-1.compute.internal, 39411) . .Encountered Exception An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at org.apache.spark.SparkContext.stop(SparkContext.scala:1380) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143) . . Looking further, it seems like takeOrdered (called by my application) uses collect() internally and hence drains out all the Drive memory. line 361, in top10EndPoints topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) File /home/hadoop/spark/python/pyspark/rdd.py, line 1174, in takeOrdered return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge) File /home/hadoop/spark/python/pyspark/rdd.py, line 739, in reduce vals = self.mapPartitions(func).collect() File /home/hadoop/spark/python/pyspark/rdd.py, line 713, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) How can I rewrite this code endpointCounts = (access_logs .map(lambda log: (log.endpoint, 1)) .reduceByKey(lambda a, b : a + b)) #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, count2), ] topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1]) so that this error does not happen?
How to parse multiple event types using Kafka
Folks, I use the following Streaming API from KafkaUtils : public JavaPairInputDStreamString, String inputDStream() { HashSetString topicsSet = new HashSetString(Arrays.asList(topics.split(,))); HashMapString, String kafkaParams = new HashMapString, String(); kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers); return KafkaUtils.createDirectStream( streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); } I catch the messages using :JavaDStreamString messages = inputDStream.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); My problem is, each of these Kafka Topics stream in different message types. How do I distinguish messages that are of type1, messages that are of type2, . ? I tried the following: private class ParseEventsT implements FunctionString, T { final ClassT parameterClass; private ParseEvents(ClassT parameterClass) { this.parameterClass = parameterClass; } public T call(String message) throws Exception { ObjectMapper mapper = new ObjectMapper(); T parsedMessage = null; try { parsedMessage = mapper.readValue(message, this.parameterClass); } catch (Exception e1) { logger.error(Ignoring Unknown Message %s, message); } return parsedMessage; } }JavaDStreamType1 type1Events = messages.map(new ParseEventsType1(Type1.class));JavaDStreamType2 type2Events = messages.map(new ParseEventsType2(Type2.class));JavaDStreamType3 type3Events = messages.map(new ParseEventsType3(Type3.class)); But this does not work because type1 catches type2 messages and ignores them. Is there a clean way of handling this ?
How to automatically relaunch a Driver program after crashes?
Folks, As I see, the Driver program is a single point of failure. Now, I have seen ways as to how to make it recover from failures on a restart (using Checkpointing) but I have not seen anything as to how to restart it automatically if it crashes. Will running the Driver as a Hadoop Yarn Application do it? Can someone educate me as to how?
Re: How to automatically relaunch a Driver program after crashes?
Thanks for the reply. Are Standalone or Mesos the only options? Is there a way to auto relaunch if driver runs as a Hadoop Yarn Application? On Wednesday, 19 August 2015 12:49 PM, Todd bit1...@163.com wrote: There is an option for the spark-submit (Spark standalone or Mesos with cluster deploy mode only) --supervise If given, restarts the driver on failure. At 2015-08-19 14:55:39, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Folks, As I see, the Driver program is a single point of failure. Now, I have seen ways as to how to make it recover from failures on a restart (using Checkpointing) but I have not seen anything as to how to restart it automatically if it crashes. Will running the Driver as a Hadoop Yarn Application do it? Can someone educate me as to how?
Re: Not seeing Log messages
Forgot to mention. Here is how I run the program : ./bin/spark-submit --conf spark.app.master=local[1] ~/workspace/spark-python/ApacheLogWebServerAnalysis.py On Wednesday, 12 August 2015 10:28 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I wrote a small python program : def parseLogs(self): Read and parse log file self._logger.debug(Parselogs() start) self.parsed_logs = (self._sc .textFile(self._logFile) .map(self._parseApacheLogLine) .cache()) self.access_logs = (self.parsed_logs .filter(lambda s: s[1] == 1) .map(lambda s: s[0]) .cache()) self.failed_logs = (self.parsed_logs .filter(lambda s: s[1] == 0) .map(lambda s: s[0])) failed_logs_count = self.failed_logs.count() if failed_logs_count 0: self._logger.debug('Number of invalid logline: %d' % self.failed_logs.count()) for line in self.failed_logs.take(20): self._logger.debug('Invalid logline: %s' % line) self._logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % \ (self.parsed_logs.count(), self.access_logs.count(), self.failed_logs.count())) return (self.parsed_logs, self.access_logs, self.failed_logs) def main(argv): try: logger = createLogger(pyspark, logging.DEBUG, LogAnalyzer.log, ./) logger.debug(Starting LogAnalyzer) myLogAnalyzer = ApacheLogAnalyzer(logger) (parsed_logs, access_logs, failed_logs) = myLogAnalyzer.parseLogs() except Exception as e: print Encountered Exception %s %str(e) logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())) logger.info(DONE. ALL TESTS PASSED) I see some log messages:Starting LogAnalyzerParselogs() startDONE. ALL TESTS PASSED But do not see some log messages:Read %d lines, successfully parsed %d lines, failed to parse %d lines' But, This line:logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count()))I get the following error : Encountered Exception Cannot pickle files that are not opened for reading Do not have a clue as to what's happening. Any help will be appreciated.
Not seeing Log messages
I wrote a small python program : def parseLogs(self): Read and parse log file self._logger.debug(Parselogs() start) self.parsed_logs = (self._sc .textFile(self._logFile) .map(self._parseApacheLogLine) .cache()) self.access_logs = (self.parsed_logs .filter(lambda s: s[1] == 1) .map(lambda s: s[0]) .cache()) self.failed_logs = (self.parsed_logs .filter(lambda s: s[1] == 0) .map(lambda s: s[0])) failed_logs_count = self.failed_logs.count() if failed_logs_count 0: self._logger.debug('Number of invalid logline: %d' % self.failed_logs.count()) for line in self.failed_logs.take(20): self._logger.debug('Invalid logline: %s' % line) self._logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % \ (self.parsed_logs.count(), self.access_logs.count(), self.failed_logs.count())) return (self.parsed_logs, self.access_logs, self.failed_logs) def main(argv): try: logger = createLogger(pyspark, logging.DEBUG, LogAnalyzer.log, ./) logger.debug(Starting LogAnalyzer) myLogAnalyzer = ApacheLogAnalyzer(logger) (parsed_logs, access_logs, failed_logs) = myLogAnalyzer.parseLogs() except Exception as e: print Encountered Exception %s %str(e) logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())) logger.info(DONE. ALL TESTS PASSED) I see some log messages:Starting LogAnalyzerParselogs() startDONE. ALL TESTS PASSED But do not see some log messages:Read %d lines, successfully parsed %d lines, failed to parse %d lines' But, This line:logger.debug('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count()))I get the following error : Encountered Exception Cannot pickle files that are not opened for reading Do not have a clue as to what's happening. Any help will be appreciated.
How do I Process Streams that span multiple lines?
All examples of Spark Stream programming that I see assume streams of lines that are then tokenised and acted upon (like the WordCount example). How do I process Streams that span multiple lines? Are there examples that I can use?
Re: Spark 1.3.1 + Hive: write output to CSV with header on S3
Hi Roberto I have question regarding HiveContext . when you create HiveContext where you define Hive connection properties ? Suppose Hive is not in local machine i need to connect , how HiveConext will know the data base info like url ,username and password ? String username = ; String password = ; String url = jdbc:hive2://quickstart.cloudera:1/default; On Friday, July 17, 2015 2:29 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hello community, I'm currently using Spark 1.3.1 with Hive support for outputting processed data on an external Hive table backed on S3. I'm using a manual specification of the delimiter, but I'd want to know if is there any clean way to write in CSV format: val sparkConf = new SparkConf()val sc = new SparkContext(sparkConf)val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext.implicits._ hiveContext.sql( CREATE EXTERNAL TABLE IF NOT EXISTS table_name(field1 STRING, field2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION ' + path_on_s3 + ')hiveContext.sql(an INSERT OVERWRITE query to write into the above table) I also need the header of the table to be printed on each written file. I tried with: hiveContext.sql(set hive.cli.print.header=true) But it didn't work. Any hint? Thank you. Best regards,Roberto
Re: Java 8 vs Scala
I struggle lots in Scala , almost 10 days n0 improvement , but when i switch to Java 8 , things are so smooth , and I used Data Frame with Redshift and Hive all are looking good .if you are very good In Scala the go with Scala otherwise Java is best fit . This is just my openion because I am Java guy. On Wednesday, July 15, 2015 12:33 PM, vaquar khan vaquar.k...@gmail.com wrote: My choice is java 8On 15 Jul 2015 18:03, Alan Burlison alan.burli...@oracle.com wrote: On 15/07/2015 08:31, Ignacio Blasco wrote: The main advantage of using scala vs java 8 is being able to use a console https://bugs.openjdk.java.net/browse/JDK-8043364 -- Alan Burlison -- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Data Frame for nested json
is DataFrame support nested json to dump directely to data base For simple json it working fine {id:2,name:Gerald,email:gbarn...@zimbio.com,city:Štoky,country:Czech Republic,ip:92.158.154.75”}, But for nested json it failed to load root |-- rows: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- cell: array (nullable = true) | | | |-- element: string (containsNull = true) 2015-07-14 14:50:05[Thread-0] INFO SparkContext:59 - Invoking stop() from shutdown hookException in thread main java.lang.IllegalArgumentException: Don't know how to save StructField(rows,ArrayType(StructType(StructField(cell,ArrayType(StringType,true),true)),true),true) to JDBC at org.apache.spark
Java 8 vs Scala
Hi All To Start new project in Spark , which technology is good .Java8 OR Scala . I am Java developer , Can i start with Java 8 or I Need to learn Scala . which one is better technology for quick start any POC project Thanks - su
Re: spark - redshift !!!
Hi 'I am looking how to load data in redshift .Thanks On Wednesday, July 8, 2015 12:47 AM, shahab shahab.mok...@gmail.com wrote: Hi, I did some experiment with loading data from s3 into spark. I loaded data from s3 using sc.textFile(). Have a look at the following code snippet: val csv = sc.textFile(s3n://mybucket/myfile.csv) val rdd = csv.map(line = line.split(,).map(elem = elem.trim)) // my data format is in CSV format, comma separated.map (r = MyIbject(r(3), r(4).toLong, r(5).toLong, r(6))) //just map it to the target object format hope this helps,best,/Shahab On Wed, Jul 8, 2015 at 12:57 AM, spark user spark_u...@yahoo.com.invalid wrote: Hi Can you help me how to load data from s3 bucket to redshift , if you gave sample code can you pls send me Thanks su
SparkR dataFrame read.df fails to read from aws s3
I have Spark 1.4 deployed on AWS EMR but methods of SparkR dataFrame read.df method cannot load data from aws s3. 1) read.df error message read.df(sqlContext,s3://some-bucket/some.json,json) 15/07/09 04:07:01 ERROR r.RBackendHandler: loadDF on org.apache.spark.sql.api.r.SQLUtils failed java.lang.IllegalArgumentException: invalid method loadDF for object org.apache.spark.sql.api.r.SQLUtils at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:143) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:74) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:36) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 2) jsonFile is working though with some warning messageWarning message: In normalizePath(path) : path[1]=s3://rea-consumer-data-dev/cbr/profiler/output/20150618/part-0: No such file or directory
spark - redshift !!!
Hi Can you help me how to load data from s3 bucket to redshift , if you gave sample code can you pls send me Thanks su
Can a Spark Driver Program be a REST Service by itself?
Folks, My Use case is as follows: My Driver program will be aggregating a bunch of Event Streams and acting on it. The Action on the aggregated events is configurable and can change dynamically. One way I can think of is to run the Spark Driver as a Service where a config push can be caught via an API that the Driver exports.Can I have a Spark Driver Program run as a REST Service by itself? Is this a common use case? Is there a better way to solve my problem? Thanks
Can I do Joins across Event Streams ?
Hi, I have to build a system that reacts to a set of events. Each of these events are separate streams by themselves which are consumed from different Kafka Topics and hence will have different InputDStreams. Questions: Will I be able to do joins across multiple InputDStreams and collate the output using a single Accumulator?These Event Streams can have their own frequency of occurrence. How will I be able to co-ordinate the out of sync behaviour?
Serialization Exception
For prototyping purposes, I created a test program injecting dependancies using Spring. Nothing fancy. This is just a re-write of KafkaDirectWordCount. When I run this, I get the following exception: Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.SparkContext.withScope(SparkContext.scala:681) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157) at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43) at com.olacabs.spark.examples.WordCountProcessorKafkaImpl.process(WordCountProcessorKafkaImpl.java:45) at com.olacabs.spark.examples.WordCountApp.main(WordCountApp.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects. Serialization stack: - object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: org.apache.spark.streaming.api.java.JavaStreamingContext@7add323c) - field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl, name: streamingContext, type: class org.apache.spark.streaming.api.java.JavaStreamingContext) - object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl, com.olacabs.spark.examples.WordCountProcessorKafkaImpl@29a1505c) - field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, name: this$0, type: class com.olacabs.spark.examples.WordCountProcessorKafkaImpl) - object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1@c6c82aa) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, function1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 23 more Can someone help me figure out why? Here is the Code : public interface EventProcessor extends Serializable { void process(); } public class WordCountProcessorKafkaImpl implements EventProcessor { private static final Pattern SPACE = Pattern.compile( ); @Autowired @Qualifier(streamingContext) JavaStreamingContext streamingContext; @Autowired @Qualifier(inputDStream) JavaPairInputDStreamString, String inputDStream; @Override public void process() { // Get the lines, split them into words, count the words and print JavaDStreamString lines = inputDStream.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return
Re: s3 bucket access/read file
Pls check your ACL properties. On Monday, June 29, 2015 11:29 AM, didi did...@gmail.com wrote: Hi *Cant read text file from s3 to create RDD * after setting the configuration val hadoopConf=sparkContext.hadoopConfiguration; hadoopConf.set(fs.s3.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3.awsAccessKeyId,yourAccessKey) hadoopConf.set(fs.s3.awsSecretAccessKey,yourSecretKey) 1. running the following val hfile = sc.textFile(s3n://mybucket/temp/) I get the error Exception in thread main org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/temp' - ResponseCode=400, ResponseMessage=Bad Request 2. running the following val hfile = sc.textFile(s3n://mybucket/*.txt) I get the error Exception in thread main org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 GET failed for '/' XML Error Message: ?xml version=1.0 encoding=UTF-8?ErrorCodeInvalidRequest/CodeMessage*The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256*./MessageRequestIdC2174C316DEC91CB/RequestIdHostId3oPZfZoPZUbvzXJdVaUGl9N0oI1buMx+A/wJiisx7uZ0bpnTkwsaT6i0fhYhjY97JDWBX1x/2Y8=/HostId/Error I read it has to do something with the v4 signature??? isn't it supported by the sdk?? 3. running the following val hfile = sc.textFile(s3n://mybucket) get the error Exception in thread main org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/user%2Fdidi' - ResponseCode=400, ResponseMessage=Bad Request what does the user has to do here??? i am using key secret !!! How can i simply create RDD from text file on S3 Thanks Didi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/s3-bucket-access-read-file-tp23536.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scala/Python or Java
Spark is based on Scala and it written in Scala .To debug and fix issue i guess learning Scala is good for long term ? any advise ? On Thursday, June 25, 2015 1:26 PM, ayan guha guha.a...@gmail.com wrote: I am a python fan so I use python. But what I noticed some features are typically 1-2 release behind for python. So I strongly agree with Ted that start with language you are most familiar with and plan to move to scala eventually On 26 Jun 2015 06:07, Ted Yu yuzhih...@gmail.com wrote: The answer depends on the user's experience with these languages as well as the most commonly used language in the production environment. Learning Scala requires some time. If you're very comfortable with Java / Python, you can go with that while at the same time familiarizing yourself with Scala. Cheers On Thu, Jun 25, 2015 at 12:04 PM, spark user spark_u...@yahoo.com.invalid wrote: Hi All , I am new for spark , i just want to know which technology is good/best for spark learning ? 1) Scala 2) Java 3) Python I know spark support all 3 languages , but which one is best ? Thanks su
Scala/Python or Java
Hi All , I am new for spark , i just want to know which technology is good/best for spark learning ? 1) Scala 2) Java 3) Python I know spark support all 3 languages , but which one is best ? Thanks su
Re: Spark or Storm
Again, by Storm, you mean Storm Trident, correct? On Wednesday, 17 June 2015 10:09 PM, Michael Segel msegel_had...@hotmail.com wrote: Actually the reverse. Spark Streaming is really a micro batch system where the smallest window is 1/2 a second (500ms). So for CEP, its not really a good idea. So in terms of options…. spark streaming, storm, samza, akka and others… Storm is probably the easiest to pick up, spark streaming / akka may give you more flexibility and akka would work for CEP. Just my $0.02 On Jun 16, 2015, at 9:40 PM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I have a use-case where a stream of Incoming events have to be aggregated and joined to create Complex events. The aggregation will have to happen at an interval of 1 minute (or less). The pipeline is : send events enrich eventUpstream services --- KAFKA - event Stream Processor Complex Event Processor Elastic Search. From what I understand, Storm will make a very good ESP and Spark Streaming will make a good CEP. But, we are also evaluating Storm with Trident. How does Spark Streaming compare with Storm with Trident? Sridhar Chellappa On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com wrote: I have a similar scenario where we need to bring data from kinesis to hbase. Data volecity is 20k per 10 mins. Little manipulation of data will be required but that's regardless of the tool so we will be writing that piece in Java pojo. All env is on aws. Hbase is on a long running EMR and kinesis on a separate cluster.TIA. Best AyanOn 17 Jun 2015 12:13, Will Briggs wrbri...@gmail.com wrote: The programming models for the two frameworks are conceptually rather different; I haven't worked with Storm for quite some time, but based on my old experience with it, I would equate Spark Streaming more with Storm's Trident API, rather than with the raw Bolt API. Even then, there are significant differences, but it's a bit closer. If you can share your use case, we might be able to provide better guidance. Regards, Will On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote: Hi All, I am evaluating spark VS storm ( spark streaming ) and i am not able to see what is equivalent of Bolt in storm inside spark. Any help will be appreciated on this ? Thanks , Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark or Storm
When you say Storm, did you mean Storm with Trident or Storm? My use case does not have simple transformation. There are complex events that need to be generated by joining the incoming event stream. Also, what do you mean by No Back PRessure ? On Wednesday, 17 June 2015 11:57 AM, Enno Shioji eshi...@gmail.com wrote: We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm. Some of the important draw backs are: Spark has no back pressure (receiver rate limit can alleviate this to a certain point, but it's far from ideal)There is also no exactly-once semantics. (updateStateByKey can achieve this semantics, but is not practical if you have any significant amount of state because it does so by dumping the entire state on every checkpointing) There are also some minor drawbacks that I'm sure will be fixed quickly, like no task timeout, not being able to read from Kafka using multiple nodes, data loss hazard with Kafka. It's also not possible to attain very low latency in Spark, if that's what you need. The pos for Spark is the concise and IMO more intuitive syntax, especially if you compare it with Storm's Java API. I admit I might be a bit biased towards Storm tho as I'm more familiar with it. Also, you can do some processing with Kinesis. If all you need to do is straight forward transformation and you are reading from Kinesis to begin with, it might be an easier option to just do the transformation in Kinesis. On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Whatever you write in bolts would be the logic you want to apply on your events. In Spark, that logic would be coded in map() or similar such transformations and/or actions. Spark doesn't enforce a structure for capturing your processing logic like Storm does.Regards SabProbably overloading the question a bit. In Storm, Bolts have the functionality of getting triggered on events. Is that kind of functionality possible with Spark streaming? During each phase of the data processing, the transformed data is stored to the database and this transformed data should then be sent to a new pipeline for further processing How can this be achieved using Spark? On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I have a use-case where a stream of Incoming events have to be aggregated and joined to create Complex events. The aggregation will have to happen at an interval of 1 minute (or less). The pipeline is : send events enrich eventUpstream services --- KAFKA - event Stream Processor Complex Event Processor Elastic Search. From what I understand, Storm will make a very good ESP and Spark Streaming will make a good CEP. But, we are also evaluating Storm with Trident. How does Spark Streaming compare with Storm with Trident? Sridhar Chellappa On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com wrote: I have a similar scenario where we need to bring data from kinesis to hbase. Data volecity is 20k per 10 mins. Little manipulation of data will be required but that's regardless of the tool so we will be writing that piece in Java pojo. All env is on aws. Hbase is on a long running EMR and kinesis on a separate cluster.TIA. Best AyanOn 17 Jun 2015 12:13, Will Briggs wrbri...@gmail.com wrote: The programming models for the two frameworks are conceptually rather different; I haven't worked with Storm for quite some time, but based on my old experience with it, I would equate Spark Streaming more with Storm's Trident API, rather than with the raw Bolt API. Even then, there are significant differences, but it's a bit closer. If you can share your use case, we might be able to provide better guidance. Regards, Will On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote: Hi All, I am evaluating spark VS storm ( spark streaming ) and i am not able to see what is equivalent of Bolt in storm inside spark. Any help will be appreciated on this ? Thanks , Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark or Storm
I have a use-case where a stream of Incoming events have to be aggregated and joined to create Complex events. The aggregation will have to happen at an interval of 1 minute (or less). The pipeline is : send events enrich eventUpstream services --- KAFKA - event Stream Processor Complex Event Processor Elastic Search. From what I understand, Storm will make a very good ESP and Spark Streaming will make a good CEP. But, we are also evaluating Storm with Trident. How does Spark Streaming compare with Storm with Trident? Sridhar Chellappa On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com wrote: I have a similar scenario where we need to bring data from kinesis to hbase. Data volecity is 20k per 10 mins. Little manipulation of data will be required but that's regardless of the tool so we will be writing that piece in Java pojo. All env is on aws. Hbase is on a long running EMR and kinesis on a separate cluster.TIA. Best AyanOn 17 Jun 2015 12:13, Will Briggs wrbri...@gmail.com wrote: The programming models for the two frameworks are conceptually rather different; I haven't worked with Storm for quite some time, but based on my old experience with it, I would equate Spark Streaming more with Storm's Trident API, rather than with the raw Bolt API. Even then, there are significant differences, but it's a bit closer. If you can share your use case, we might be able to provide better guidance. Regards, Will On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote: Hi All, I am evaluating spark VS storm ( spark streaming ) and i am not able to see what is equivalent of Bolt in storm inside spark. Any help will be appreciated on this ? Thanks , Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re[2]: HBase 0.96+ with Spark 1.0+
Hi guys, any luck with this issue, anyone? I aswell tried all the possible exclusion combos to a no avail. thanks for your ideas reinis -Original-Nachricht- Von: Stephen Boesch java...@gmail.com An: user user@spark.apache.org Datum: 28-06-2014 15:12 Betreff: Re: HBase 0.96+ with Spark 1.0+ Hi Siyuan, Thanks for the input. We are preferring to use the SparkBuild.scala instead of maven. I did not see any protobuf.version related settings in that file. But - as noted by Sean Owen - in any case the issue we are facing presently is about the duplicate incompatible javax.servlet entries - apparently from the org.mortbay artifacts. 2014-06-28 6:01 GMT-07:00 Siyuan he hsy...@gmail.com: Hi Stephen, I am using spark1.0+ HBase0.96.2. This is what I did: 1) rebuild spark using: mvn -Dhadoop.version=2.3.0 -Dprotobuf.version=2.5.0 -DskipTests clean package 2) In spark-env.sh, set SPARK_CLASSPATH = /path-to/hbase-protocol-0.96.2-hadoop2.jar Hopefully it can help. Siyuan On Sat, Jun 28, 2014 at 8:52 AM, Stephen Boesch java...@gmail.com wrote: Thanks Sean. I had actually already added exclusion rule for org.mortbay.jetty - and that had not resolved it. Just in case I used your precise formulation: val excludeMortbayJetty = ExclusionRule(organization = org.mortbay.jetty) .. ,(org.apache.spark % spark-core_2.10 % sparkVersion withSources()).excludeAll(excludeMortbayJetty) ,(org.apache.spark % spark-sql_2.10 % sparkVersion withSources()).excludeAll(excludeMortbayJetty) However the same error still recurs: 14/06/28 05:48:35 INFO HttpServer: Starting HTTP Server [error] (run-main-0) java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package 2014-06-28 4:22 GMT-07:00 Sean Owen so...@cloudera.com: This sounds like an instance of roughly the same item as in https://issues.apache.org/jira/browse/SPARK-1949 Have a look at adding that exclude to see if it works. On Fri, Jun 27, 2014 at 10:21 PM, Stephen Boesch java...@gmail.com wrote: The present trunk is built and tested against HBase 0.94. I have tried various combinations of versions of HBase 0.96+ and Spark 1.0+ and all end up with 14/06/27 20:11:15 INFO HttpServer: Starting HTTP Server [error] (run-main-0) java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package at java.lang.ClassLoader.checkCerts(ClassLoader.java:952) I have tried a number of different ways to exclude javax.servlet related jars. But none have avoided this error. Anyone have a (small-ish) build.sbt that works with later versions of HBase? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org