Re: Get full RDD lineage for a spark job
You could also enable it with --conf spark.logLineage=true if you do not want to change any code. Regards, Keith. http://keith-chapman.com On Fri, Jul 21, 2017 at 7:57 PM, Keith Chapman wrote: > Hi Ron, > > You can try using the toDebugString method on the RDD, this will print > the RDD lineage. > > Regards, > Keith. > > http://keith-chapman.com > > On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez < > zlgonza...@yahoo.com.invalid> wrote: > >> Hi, >> Can someone point me to a test case or share sample code that is able >> to extract the RDD graph from a Spark job anywhere during its lifecycle? I >> understand that Spark has UI that can show the graph of the execution so >> I'm hoping that is using some API somewhere that I could use. >> I know RDD is the actual execution graph, so if there is also a more >> logical abstraction API closer to calls like map, filter, aggregate, etc., >> that would even be better. >> Appreciate any help... >> >> Thanks, >> Ron >> > >
Re: Get full RDD lineage for a spark job
Hi Ron, You can try using the toDebugString method on the RDD, this will print the RDD lineage. Regards, Keith. http://keith-chapman.com On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez wrote: > Hi, > Can someone point me to a test case or share sample code that is able to > extract the RDD graph from a Spark job anywhere during its lifecycle? I > understand that Spark has UI that can show the graph of the execution so > I'm hoping that is using some API somewhere that I could use. > I know RDD is the actual execution graph, so if there is also a more > logical abstraction API closer to calls like map, filter, aggregate, etc., > that would even be better. > Appreciate any help... > > Thanks, > Ron >
Spark Job crash due to File Not found when shuffle intermittently
Hi, I have several Spark jobs including both batch job and Stream jobs to process the system log and analyze them. We are using Kafka as the pipeline to connect each jobs. Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some of the jobs(both batch or streaming) are thrown below exceptions randomly(either after several hours run or just run in 20 mins). Can anyone give me some suggestions about how to figure out the real root cause? (Looks like google result is not very useful...) Thanks, Martin 00:30:04,510 WARN - 17/07/22 00:30:04 WARN TaskSetManager: Lost task 60.0 in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0): java.io.FileNotFoundException: /mnt/mesos/work_dir/slaves/20160924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a-4df9-b034-8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-4f37-a106-27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-c35643e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6 (No such file or directory) 00:30:04,510 WARN - at java.io.FileOutputStream.open0(Native Method) 00:30:04,510 WARN - at java.io.FileOutputStream.open(FileOutputStream.java:270) 00:30:04,510 WARN - at java.io.FileOutputStream.(FileOutputStream.java:213) 00:30:04,510 WARN - at java.io.FileOutputStream.(FileOutputStream.java:162) 00:30:04,510 WARN - at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144) 00:30:04,510 WARN - at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128) 00:30:04,510 WARN - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 00:30:04,510 WARN - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 00:30:04,510 WARN - at org.apache.spark.scheduler.Task.run(Task.scala:99) 00:30:04,510 WARN - at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 00:30:04,510 WARN - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 00:30:04,510 WARN - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 00:30:04,510 WARN - at java.lang.Thread.run(Thread.java:748) 00:30:04,580 INFO - Driver stacktrace: 00:30:04,580 INFO - org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 00:30:04,580 INFO - scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 00:30:04,580 INFO - scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 00:30:04,580 INFO - scala.Option.foreach(Option.scala:257) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 00:30:04,580 INFO - org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 00:30:04,580 INFO - org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 00:30:04,580 INFO - org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 00:30:04,580 INFO - org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 00:30:04,580 INFO - org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 00:30:04,580 INFO - org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353) 00:30:04,580 INFO - org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 00:30:04,580 INFO - org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 00:30:04,580 INFO - org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 00:30:04,580 INFO - org.apache.spark.rdd.RDD.take(RDD.scala:1326) 00:30:04,580 INFO - org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1461) 00:30:04,580 INFO - org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461) 00:30:04,580 INFO - org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461) 00:30:04,580 INFO - org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 00:30:04,580 INFO - org.apache.spark.rdd.RDDOperationScope$.withScope(
[Spark] Working with JavaPairRDD from Scala
Hi, I would like to call a method on JavaPairRDD from Scala and I am not sure how to write a function for the "map". I am using a third-party library that uses Spark for geospatial computations and it happens that it returns some results through Java API. I'd welcome a hint how to write a function for 'map' such that JavaPairRDD is happy. Here's a signature: org.apache.spark.api.java.JavaPairRDD[com.vividsolutions.jts.geom.Polygon,java.util.HashSet[com.vividsolutions.jts.geom.Polygon]] = org.apache.spark.api.java.JavaPairRDD Normally I would write something like this: def calculate_intersection(polygon: Polygon, hashSet: HashSet[Polygon]) = { (polygon, hashSet.asScala.map(polygon.intersection(_).getArea)) } javapairrdd.map(calculate_intersection) ... but it will complain that it's not a Java Function. My first thought was to implement the interface, i.e.: class PairRDDWrapper extends org.apache.spark.api.java.function.Function2[Polygon, HashSet[Polygon]] { override def call(polygon: Polygon, hashSet: HashSet[Polygon]): (Polygon, scala.collection.mutable.Set[Double]) = { (polygon, hashSet.asScala.map(polygon.intersection(_).getArea)) } } I am not sure though how to use it, or if it makes any sense in the first place. Should be simple, it's just my Java / Scala is "little rusty". Cheers, Lucas
Fwd: Spark Structured Streaming - Spark Consumer does not display messages
Hi, This is first time I am trying structured streaming with Kafka. I have simple code to read from Kafka and display it on the console. Message is in JSON format. However, when I run my code nothin after below line gets printed. 17/07/21 13:43:41 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5 17/07/21 13:43:41 INFO StreamExecution: Starting new streaming query. 17/07/21 13:43:42 INFO AbstractCoordinator: Discovered coordinator XXX:9092 (id: 2147483647 <(214)%20748-3647> rack: null) for group spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0. 17/07/21 13:43:42 INFO AbstractCoordinator: Marking the coordinatorXXX:9092 (id: 2147483647 <(214)%20748-3647> rack: null) dead for group spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0 Code is - Dataset kafkaStream = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", config.getString("kafka.host") + ":" + config.getString("kafka.port")) .option("subscribe", "test") .load(); //kafkaStream.printSchema(); //JSON ::: {"id":1,"name":"MySelf"} StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("id", DataTypes.IntegerType, false), DataTypes.createStructField("name", DataTypes.StringType, false)}); Dataset streamingSelectDF = kafkaStream.selectExpr("CAST(value AS STRING) as message") .select(functions.from_json(functions.col("message"), schema).as("json")) .select("json.*") .as(Encoders.bean(KafkaMessage.class)); streamingSelectDF.createOrReplaceTempView("MyView"); Dataset streamData = spark.sql("SELECT count(*) from MyView"); StreamingQuery streamingQuery = streamData.writeStream() .format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime("10 seconds")).start(); try { streamingQuery.awaitTermination(); } catch (StreamingQueryException e) { e.printStackTrace(); } Regards, Leena
Spark Structured Streaming - Spark Consumer does not display messages
Hi, This is first time I am trying structured streaming with Kafka. I have simple code to read from Kafka and display it on the console. Message is in JSON format. However, when I run my code nothin after below line gets printed. 17/07/21 13:43:41 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5 17/07/21 13:43:41 INFO StreamExecution: Starting new streaming query. 17/07/21 13:43:42 INFO AbstractCoordinator: Discovered coordinator XXX:9092 (id: 2147483647 rack: null) for group spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0. 17/07/21 13:43:42 INFO AbstractCoordinator: Marking the coordinatorXXX:9092 (id: 2147483647 rack: null) dead for group spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0 Code is - Dataset kafkaStream = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", config.getString("kafka.host") + ":" + config.getString("kafka.port")) .option("subscribe", "test") .load(); //kafkaStream.printSchema(); //JSON ::: {"id":1,"name":"MySelf"} StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("id", DataTypes.IntegerType, false), DataTypes.createStructField("name", DataTypes.StringType, false)}); Dataset streamingSelectDF = kafkaStream.selectExpr("CAST(value AS STRING) as message") .select(functions.from_json(functions.col("message"), schema).as("json")) .select("json.*") .as(Encoders.bean(KafkaMessage.class)); streamingSelectDF.createOrReplaceTempView("MyView"); Dataset streamData = spark.sql("SELECT count(*) from MyView"); StreamingQuery streamingQuery = streamData.writeStream() .format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime("10 seconds")).start(); try { streamingQuery.awaitTermination(); } catch (StreamingQueryException e) { e.printStackTrace(); } Regards, Leena
Re: Spark 2.0 and Oracle 12.1 error
Hi Xiao, I am trying JSON sample table provided by Oracle 12C. It is on the website - https://docs.oracle.com/database/121/ADXDB/json.htm#ADXDB6371 CREATE TABLE j_purchaseorder (id RAW (16) NOT NULL, date_loaded TIMESTAMP WITH TIME ZONE, po_document CLOB CONSTRAINT ensure_json CHECK (po_document IS JSON)); Data that I inserted was - { "PONumber" : 1600, "Reference": "ABULL-20140421", "Requestor": "Alexis Bull", "User" : "ABULL", "CostCenter" : "A50", "ShippingInstructions" : { "name" : "Alexis Bull", "Address": { "street" : "200 Sporting Green", "city": "South San Francisco", "state" : "CA", "zipCode" : 99236, "country" : "United States of America" }, "Phone" : [ { "type" : "Office", "number" : "909-555-7307" }, { "type" : "Mobile", "number" : "415-555-1234" } ] }, "Special Instructions" : null, "AllowPartialShipment" : false, "LineItems": [ { "ItemNumber" : 1, "Part" : { "Description" : "One Magic Christmas", "UnitPrice" : 19.95, "UPCCode" : 13131092899 }, "Quantity" : 9.0 }, { "ItemNumber" : 2, "Part" : { "Description" : "Lethal Weapon", "UnitPrice" : 19.95, "UPCCode" : 85391628927 }, "Quantity" : 5.0 } ] } On Fri, Jul 21, 2017 at 10:12 AM, Xiao Li wrote: > Could you share the schema of your Oracle table and open a JIRA? > > Thanks! > > Xiao > > > 2017-07-21 9:40 GMT-07:00 Cassa L : > >> I am using 2.2.0. I resolved the problem by removing SELECT * and adding >> column names to the SELECT statement. That works. I'm wondering why SELECT >> * will not work. >> >> Regards, >> Leena >> >> On Fri, Jul 21, 2017 at 8:21 AM, Xiao Li wrote: >> >>> Could you try 2.2? We fixed multiple Oracle related issues in the latest >>> release. >>> >>> Thanks >>> >>> Xiao >>> >>> >>> On Wed, 19 Jul 2017 at 11:10 PM Cassa L wrote: >>> Hi, I am trying to use Spark to read from Oracle (12.1) table using Spark 2.0. My table has JSON data. I am getting below exception in my code. Any clue? > java.sql.SQLException: Unsupported type -101 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.o rg$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$ getCatalystType(JdbcUtils.scala:233) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$a nonfun$8.apply(JdbcUtils.scala:290) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$a nonfun$8.apply(JdbcUtils.scala:290) at scala.Option.getOrElse(Option.scala:121) at == My code is very simple. SparkSession spark = SparkSession .builder() .appName("Oracle Example") .master("local[4]") .getOrCreate(); final Properties connectionProperties = new Properties(); connectionProperties.put("user", *"some_user"*)); connectionProperties.put("password", "some_pwd")); final String dbTable = "(select * from MySampleTable)"; Dataset jdbcDF = spark.read().jdbc(*URL*, dbTable, connectionProperties); >> >
unsuscribe
Please unsuscribe Thanks -- *Cornelio Iñigo*
Re: Spark Data Frame Writer - Range Partiotioning
How about creating a partituon column and use it? On Sat, 22 Jul 2017 at 2:47 am, Jain, Nishit wrote: > Is it possible to have Spark Data Frame Writer write based on > RangePartioning? > > For Ex - > > I have 10 distinct values for column_a, say 1 to 10. > > df.write > .partitionBy("column_a") > > Above code by default will create 10 folders .. column_a=1,column_a=2 > ...column_a=10 > > I want to see if it is possible to have these partitions based on bucket - > col_a=1to5, col_a=5-10 .. or something like that? Then also have query > engine respect it > > Thanks, > > Nishit > -- Best Regards, Ayan Guha
[ Spark SQL ] Conversion from Spark SQL to Avro decimal
Hi, I'm having problems in converting a decimal data from Spark SQL to Avro. I'm try to write an Avro file with such schema for the decimal field from a spark application: { "name":"num", "type": ["null", {"type": "bytes", "logicalType": "decimal", "precision": 3, "scale": 1}], "doc":"durata" } Then, I convert the number from string to BigDecimal, set the scale value and convert it to a byte array, then writing the avro file. But when I try to query the data inserted (both with hive and spark SQL) I have such error: Caused by: org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Failed to obtain scale value from file schema: "bytes" ... Caused by: java.lang.NullPointerException Is the error caused by a wrong serialization of the avro file or what? Have someone encountered such issue? Have some suggestions? Thanks, Ernesto
Get full RDD lineage for a spark job
Hi, Can someone point me to a test case or share sample code that is able to extract the RDD graph from a Spark job anywhere during its lifecycle? I understand that Spark has UI that can show the graph of the execution so I'm hoping that is using some API somewhere that I could use. I know RDD is the actual execution graph, so if there is also a more logical abstraction API closer to calls like map, filter, aggregate, etc., that would even be better. Appreciate any help... Thanks,Ron
Unsubscribe
Please unsubscribe me.
Re: Spark 2.0 and Oracle 12.1 error
Could you share the schema of your Oracle table and open a JIRA? Thanks! Xiao 2017-07-21 9:40 GMT-07:00 Cassa L : > I am using 2.2.0. I resolved the problem by removing SELECT * and adding > column names to the SELECT statement. That works. I'm wondering why SELECT > * will not work. > > Regards, > Leena > > On Fri, Jul 21, 2017 at 8:21 AM, Xiao Li wrote: > >> Could you try 2.2? We fixed multiple Oracle related issues in the latest >> release. >> >> Thanks >> >> Xiao >> >> >> On Wed, 19 Jul 2017 at 11:10 PM Cassa L wrote: >> >>> Hi, >>> I am trying to use Spark to read from Oracle (12.1) table using Spark >>> 2.0. My table has JSON data. I am getting below exception in my code. Any >>> clue? >>> >>> > >>> java.sql.SQLException: Unsupported type -101 >>> >>> at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$. >>> org$apache$spark$sql$execution$datasources$jdbc$ >>> JdbcUtils$$getCatalystType(JdbcUtils.scala:233) >>> at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$ >>> anonfun$8.apply(JdbcUtils.scala:290) >>> at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$ >>> anonfun$8.apply(JdbcUtils.scala:290) >>> at scala.Option.getOrElse(Option.scala:121) >>> at >>> >>> == >>> My code is very simple. >>> >>> SparkSession spark = SparkSession >>> .builder() >>> .appName("Oracle Example") >>> .master("local[4]") >>> .getOrCreate(); >>> >>> final Properties connectionProperties = new Properties(); >>> connectionProperties.put("user", *"some_user"*)); >>> connectionProperties.put("password", "some_pwd")); >>> >>> final String dbTable = >>> "(select * from MySampleTable)"; >>> >>> Dataset jdbcDF = spark.read().jdbc(*URL*, dbTable, >>> connectionProperties); >>> >>> >
Spark Data Frame Writer - Range Partiotioning
Is it possible to have Spark Data Frame Writer write based on RangePartioning? For Ex - I have 10 distinct values for column_a, say 1 to 10. df.write .partitionBy("column_a") Above code by default will create 10 folders .. column_a=1,column_a=2 ...column_a=10 I want to see if it is possible to have these partitions based on bucket - col_a=1to5, col_a=5-10 .. or something like that? Then also have query engine respect it Thanks, Nishit
Re: Spark 2.0 and Oracle 12.1 error
I am using 2.2.0. I resolved the problem by removing SELECT * and adding column names to the SELECT statement. That works. I'm wondering why SELECT * will not work. Regards, Leena On Fri, Jul 21, 2017 at 8:21 AM, Xiao Li wrote: > Could you try 2.2? We fixed multiple Oracle related issues in the latest > release. > > Thanks > > Xiao > > > On Wed, 19 Jul 2017 at 11:10 PM Cassa L wrote: > >> Hi, >> I am trying to use Spark to read from Oracle (12.1) table using Spark >> 2.0. My table has JSON data. I am getting below exception in my code. Any >> clue? >> >> > >> java.sql.SQLException: Unsupported type -101 >> >> at org.apache.spark.sql.execution.datasources.jdbc. >> JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$ >> getCatalystType(JdbcUtils.scala:233) >> at org.apache.spark.sql.execution.datasources.jdbc. >> JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:290) >> at org.apache.spark.sql.execution.datasources.jdbc. >> JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:290) >> at scala.Option.getOrElse(Option.scala:121) >> at >> >> == >> My code is very simple. >> >> SparkSession spark = SparkSession >> .builder() >> .appName("Oracle Example") >> .master("local[4]") >> .getOrCreate(); >> >> final Properties connectionProperties = new Properties(); >> connectionProperties.put("user", *"some_user"*)); >> connectionProperties.put("password", "some_pwd")); >> >> final String dbTable = >> "(select * from MySampleTable)"; >> >> Dataset jdbcDF = spark.read().jdbc(*URL*, dbTable, >> connectionProperties); >> >>
Re: Spark on Cloudera Configuration (Scheduler Mode = FAIR)
On Fri, Jul 21, 2017 at 5:00 AM, Gokula Krishnan D wrote: > Is there anyway can we setup the scheduler mode in Spark Cluster level > besides application (SC level). That's called the cluster (or resource) manager. e.g., configure separate queues in YARN with a maximum number of resources for each. -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Supporting columns with heterogenous data
What is a good way to support non-homogenous input data? In structured streaming Let me explain the use case that we are trying to solve. We are reading data from 3 topics in Kafka. All the topics have data in Avro format, with each of them having their own schema. Now, all the 3 Avro schemas have 2 columns: EventHeader and EventPayload. EventHeader has the same structure in all 3 topics. However, each topic has a different schema for EventPayload. The EventHeader has a field called eventType that dictates what will be structure of the EventPayload Now, what we want to do is do some streaming analytics across all 3 topics. So, at a high level, we want to a) read the data from all 3 topics from Kafka, b) transform into a single streaming Data frame that contains data from all 3 topics. This data frame should have a superset of fields from all 3 schemas c) Do some aggregation/windowing on this data frame d) Write to a database The issue is how exactly do we do b) . Let’s say Event Payload in Topic A has fields x, y, z, in Topic B has p, q and r, and in Topic C has x, y and p, we need a Dataframe that contains eventID, x, y, z, p, q, r. So, there are different ways we are playing around. Note that one of the goals we are trying to achieve is separate the parsing logic to Option1 1) Read Avro data from each kafka topic as a byte array into it’s own dataframe. So, we have 3 data frames, and all 3 data frames have 1 column with binary data 2) Convert each data frame and parse the avro payload. So, now, we have 3 data frames, and all 3 have 2 fields EventHeader and EventPayload. The StructType of EventPayload in each of the 3 dataframes is dfferrent 3) Convert the 3 data frames into the standard format. Now, the output has 3 data frames all of them with the same structure: EventID, x, y, z, p, q, r 4) Union these 3 data frames I am not sure if Union across different streaming data sources will work. Will it? Option 2 1) Read Avro data from all 3 kafka topic as a byte array into a single dataframe. So, we have 1 data frames, with 1 binary column 2) Convert the data frame and parse the avro payload. Convert EventPayload into some format that can store heterogenous data. Diffierrent rows have different schema for EventPayload. So, we could convert EventPayload into JSON and store it in a String column, or convert it into AVRO and store it in a binary column 3) Write a converter that re parses the EventPayload column and creates a data frame with the standard structure Is there a better way of doing this? The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Spark 2.0 and Oracle 12.1 error
Could you try 2.2? We fixed multiple Oracle related issues in the latest release. Thanks Xiao On Wed, 19 Jul 2017 at 11:10 PM Cassa L wrote: > Hi, > I am trying to use Spark to read from Oracle (12.1) table using Spark 2.0. > My table has JSON data. I am getting below exception in my code. Any clue? > > > > java.sql.SQLException: Unsupported type -101 > > at > org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getCatalystType(JdbcUtils.scala:233) > at > org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:290) > at > org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:290) > at scala.Option.getOrElse(Option.scala:121) > at > > == > My code is very simple. > > SparkSession spark = SparkSession > .builder() > .appName("Oracle Example") > .master("local[4]") > .getOrCreate(); > > final Properties connectionProperties = new Properties(); > connectionProperties.put("user", *"some_user"*)); > connectionProperties.put("password", "some_pwd")); > > final String dbTable = > "(select * from MySampleTable)"; > > Dataset jdbcDF = spark.read().jdbc(*URL*, dbTable, connectionProperties); > >
Re: Spark (SQL / Structured Streaming) Cassandra - PreparedStatement
The scc includes the java driver. Which means you could just use java driver functions. It also provides a serializable wrapper which has session and prepared statement pooling. Something like val cc = CassandraConnector(sc.getConf) SomeFunctionWithAnIterator{ it: SomeIterator => cc.withSessionDo { session => val ps = session.prepare("statement") it.map( row => session.executeAsync(ps.bind(row)) } } // Do something with the futures here I wrote a blog post about this here http://www.russellspitzer.com/2017/02/27/Concurrency-In-Spark/#concurrency-with-the-cassandra-java-driver On Tue, Apr 11, 2017 at 4:05 AM Bastien DINE wrote: > Hi everyone, > > > > I'm using Spark Structured Streaming for Machine Learning purpose in real > time, and I want to stored predictions in my Cassandra cluster. > > > > Since I am in a streaming context, executing multiple times per seconds > the same request, one mandatory optimization is to use PreparedStatement. > > > > In the cassandra spark driver ( > https://github.com/datastax/spark-cassandra-connector) there is no way to > use PreparedStatement (in scala or python, i'm not considering java as a > option) > > > > Should i use a scala (https://github.com/outworkers/phantom) / python ( > https://github.com/datastax/python-driver) cassandra driver ? > > How does it work then, my connection object need to be serializable to be > passed to workers ? > > > > If anyone can help me ! > > > > Thanks :) > > Bastien >
Re: Spark on Cloudera Configuration (Scheduler Mode = FAIR)
Mark & Ayan, thanks for the inputs. *Is there anyway can we setup the scheduler mode in Spark Cluster level besides application (SC level).* Currently in YARN is in FAIR mode and manually we ensure that Spark Application also in FAIR mode however noticed that Applications are not releasing the resources as soon as the tasks are done when we mention Dynamic allocation = true and did not specify any explicit Executor allocation. At this moment, we are specifying the Min and Max Executor allocation at Spark Application level in order to ensure that all of our ETL Spark Applications can run parallel without any resource issues. It would be great if you could throw more insight on the how to set the preemption within yarn and Spark. Thanks & Regards, Gokula Krishnan* (Gokul)* On Thu, Jul 20, 2017 at 6:46 PM, ayan guha wrote: > Hi > > As Mark said, scheduler mode works within application ie within a Spark > Session and Spark context. This is also clear if you think where you set > the configuration - in a Spark Config which used to build a context. > > If you are using Yarn as resource manager, however, you can set YARN with > fair scheduler. If you do so, both of your applications will get "Fair" > treatment from Yarn, ie get resources in round robin manner. If you want > your App A to give up resources while using them, you need to set > preemption within Yarn and priority of applications so that preemption can > kick in. > > HTH... > > Best, Ayan > > On Fri, Jul 21, 2017 at 7:11 AM, Mark Hamstra > wrote: > >> The fair scheduler doesn't have anything to do with reallocating resource >> across Applications. >> >> https://spark.apache.org/docs/latest/job-scheduling.html#sch >> eduling-across-applications >> https://spark.apache.org/docs/latest/job-scheduling.html#sch >> eduling-within-an-application >> >> On Thu, Jul 20, 2017 at 2:02 PM, Gokula Krishnan D >> wrote: >> >>> Mark, Thanks for the response. >>> >>> Let me rephrase my statements. >>> >>> "I am submitting a Spark application(*Application*#A) with >>> scheduler.mode as FAIR and dynamicallocation=true and it got all the >>> available executors. >>> >>> In the meantime, submitting another Spark Application (*Application* >>> # B) with the scheduler.mode as FAIR and dynamicallocation=true but it got >>> only one executor. " >>> >>> Thanks & Regards, >>> Gokula Krishnan* (Gokul)* >>> >>> On Thu, Jul 20, 2017 at 4:56 PM, Mark Hamstra >>> wrote: >>> First, Executors are not allocated to Jobs, but rather to Applications. If you run multiple Jobs within a single Application, then each of the Tasks associated with Stages of those Jobs has the potential to run on any of the Application's Executors. Second, once a Task starts running on an Executor, it has to complete before another Task can be scheduled using the prior Task's resources -- the fair scheduler is not preemptive of running Tasks. On Thu, Jul 20, 2017 at 1:45 PM, Gokula Krishnan D >>> > wrote: > Hello All, > > We are having cluster with 50 Executors each with 4 Cores so can avail > max. 200 Executors. > > I am submitting a Spark application(JOB A) with scheduler.mode as FAIR > and dynamicallocation=true and it got all the available executors. > > In the meantime, submitting another Spark Application (JOB B) with the > scheduler.mode as FAIR and dynamicallocation=true but it got only one > executor. > > Normally this situation occurs when any of the JOB runs with the > Scheduler.mode= FIFO. > > 1) Have your ever faced this issue if so how to overcome this?. > > I was in the impression that as soon as I submit the JOB B the Spark > Scheduler should distribute/release few resources from the JOB A and share > it with the JOB A in the Round Robin fashion?. > > Appreciate your response !!!. > > > Thanks & Regards, > Gokula Krishnan* (Gokul)* > >>> >> > > > -- > Best Regards, > Ayan Guha >