Re: Get full RDD lineage for a spark job

2017-07-21 Thread Keith Chapman
You could also enable it with --conf spark.logLineage=true if you do not
want to change any code.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jul 21, 2017 at 7:57 PM, Keith Chapman 
wrote:

> Hi Ron,
>
> You can try using the toDebugString method on the RDD, this will print
> the RDD lineage.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez <
> zlgonza...@yahoo.com.invalid> wrote:
>
>> Hi,
>>   Can someone point me to a test case or share sample code that is able
>> to extract the RDD graph from a Spark job anywhere during its lifecycle? I
>> understand that Spark has UI that can show the graph of the execution so
>> I'm hoping that is using some API somewhere that I could use.
>>   I know RDD is the actual execution graph, so if there is also a more
>> logical abstraction API closer to calls like map, filter, aggregate, etc.,
>> that would even be better.
>>   Appreciate any help...
>>
>> Thanks,
>> Ron
>>
>
>


Re: Get full RDD lineage for a spark job

2017-07-21 Thread Keith Chapman
Hi Ron,

You can try using the toDebugString method on the RDD, this will print the
RDD lineage.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez  wrote:

> Hi,
>   Can someone point me to a test case or share sample code that is able to
> extract the RDD graph from a Spark job anywhere during its lifecycle? I
> understand that Spark has UI that can show the graph of the execution so
> I'm hoping that is using some API somewhere that I could use.
>   I know RDD is the actual execution graph, so if there is also a more
> logical abstraction API closer to calls like map, filter, aggregate, etc.,
> that would even be better.
>   Appreciate any help...
>
> Thanks,
> Ron
>


Spark Job crash due to File Not found when shuffle intermittently

2017-07-21 Thread Martin Peng
Hi,

I have several Spark jobs including both batch job and Stream jobs to
process the system log and analyze them. We are using Kafka as the pipeline
to connect each jobs.

Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some of
the jobs(both batch or streaming) are thrown below exceptions
randomly(either after several hours run or just run in 20 mins). Can anyone
give me some suggestions about how to figure out the real root cause?
(Looks like google result is not very useful...)

Thanks,
Martin

00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task 60.0
in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
java.io.FileNotFoundException:
/mnt/mesos/work_dir/slaves/20160924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a-4df9-b034-8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-4f37-a106-27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-c35643e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
(No such file or directory)
00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native Method)
00:30:04,510 WARN  - at
java.io.FileOutputStream.open(FileOutputStream.java:270)
00:30:04,510 WARN  - at
java.io.FileOutputStream.(FileOutputStream.java:213)
00:30:04,510 WARN  - at
java.io.FileOutputStream.(FileOutputStream.java:162)
00:30:04,510 WARN  - at
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
00:30:04,510 WARN  - at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.Task.run(Task.scala:99)
00:30:04,510 WARN  - at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
00:30:04,510 WARN  - at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
00:30:04,510 WARN  - at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
00:30:04,510 WARN  - at java.lang.Thread.run(Thread.java:748)

00:30:04,580 INFO  - Driver stacktrace:
00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
00:30:04,580 INFO  -
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
00:30:04,580 INFO  -
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
00:30:04,580 INFO  - scala.Option.foreach(Option.scala:257)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
00:30:04,580 INFO  -
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
00:30:04,580 INFO  - org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
00:30:04,580 INFO  - org.apache.spark.rdd.RDD.take(RDD.scala:1326)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
00:30:04,580 INFO  -

[Spark] Working with JavaPairRDD from Scala

2017-07-21 Thread Lukasz Tracewski
Hi,

I would like to call a method on JavaPairRDD from Scala and I am not sure how 
to write a function for the "map". I am using a third-party library that uses 
Spark for geospatial computations and it happens that it returns some results 
through Java API. I'd welcome a hint how to write a function for 'map' such 
that JavaPairRDD is happy.

Here's a signature:
org.apache.spark.api.java.JavaPairRDD[com.vividsolutions.jts.geom.Polygon,java.util.HashSet[com.vividsolutions.jts.geom.Polygon]]
 = org.apache.spark.api.java.JavaPairRDD

Normally I would write something like this:

def calculate_intersection(polygon: Polygon, hashSet: HashSet[Polygon]) = {
  (polygon, hashSet.asScala.map(polygon.intersection(_).getArea))
}

javapairrdd.map(calculate_intersection)


... but it will complain that it's not a Java Function.

My first thought was to implement the interface, i.e.:


class PairRDDWrapper extends 
org.apache.spark.api.java.function.Function2[Polygon, HashSet[Polygon]]
{
  override def call(polygon: Polygon, hashSet: HashSet[Polygon]): (Polygon, 
scala.collection.mutable.Set[Double]) = {
(polygon, hashSet.asScala.map(polygon.intersection(_).getArea))
  }
}




I am not sure though how to use it, or if it makes any sense in the first 
place. Should be simple, it's just my Java / Scala is "little rusty".


Cheers,
Lucas


Fwd: Spark Structured Streaming - Spark Consumer does not display messages

2017-07-21 Thread Cassa L
Hi,
This is first time I am trying structured streaming with Kafka. I have
simple code to read from Kafka and display it on the console. Message is in
JSON format. However, when I run my code nothin after below line gets
printed.

17/07/21 13:43:41 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
17/07/21 13:43:41 INFO StreamExecution: Starting new streaming query.
17/07/21 13:43:42 INFO AbstractCoordinator: Discovered coordinator XXX:9092
(id: 2147483647 <(214)%20748-3647> rack: null) for group
spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0.
17/07/21 13:43:42 INFO AbstractCoordinator: Marking the coordinatorXXX:9092
(id: 2147483647 <(214)%20748-3647> rack: null) dead for group
spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0


Code is -

   Dataset kafkaStream = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers",
config.getString("kafka.host") + ":" + config.getString("kafka.port"))
.option("subscribe", "test")
.load();
//kafkaStream.printSchema();
//JSON ::: {"id":1,"name":"MySelf"}
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name",
DataTypes.StringType, false)});

Dataset streamingSelectDF =
kafkaStream.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),
schema).as("json"))
.select("json.*")
.as(Encoders.bean(KafkaMessage.class));
streamingSelectDF.createOrReplaceTempView("MyView");
Dataset streamData = spark.sql("SELECT count(*) from MyView");


StreamingQuery streamingQuery = streamData.writeStream()
.format("console")
.outputMode("complete")
.trigger(Trigger.ProcessingTime("10 seconds")).start();
try {
streamingQuery.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}


Regards,

Leena


Spark Structured Streaming - Spark Consumer does not display messages

2017-07-21 Thread Cassa L
Hi,
This is first time I am trying structured streaming with Kafka. I have
simple code to read from Kafka and display it on the console. Message is in
JSON format. However, when I run my code nothin after below line gets
printed.

17/07/21 13:43:41 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
17/07/21 13:43:41 INFO StreamExecution: Starting new streaming query.
17/07/21 13:43:42 INFO AbstractCoordinator: Discovered coordinator XXX:9092
(id: 2147483647 rack: null) for group
spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0.
17/07/21 13:43:42 INFO AbstractCoordinator: Marking the coordinatorXXX:9092
(id: 2147483647 rack: null) dead for group
spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0


Code is -

   Dataset kafkaStream = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers",
config.getString("kafka.host") + ":" + config.getString("kafka.port"))
.option("subscribe", "test")
.load();
//kafkaStream.printSchema();
//JSON ::: {"id":1,"name":"MySelf"}
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name",
DataTypes.StringType, false)});

Dataset streamingSelectDF =
kafkaStream.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),
schema).as("json"))
.select("json.*")
.as(Encoders.bean(KafkaMessage.class));
streamingSelectDF.createOrReplaceTempView("MyView");
Dataset streamData = spark.sql("SELECT count(*) from MyView");


StreamingQuery streamingQuery = streamData.writeStream()
.format("console")
.outputMode("complete")
.trigger(Trigger.ProcessingTime("10 seconds")).start();
try {
streamingQuery.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}


Regards,

Leena


Re: Spark 2.0 and Oracle 12.1 error

2017-07-21 Thread Cassa L
Hi Xiao,
I am trying JSON sample table provided by Oracle 12C. It is on the website -
https://docs.oracle.com/database/121/ADXDB/json.htm#ADXDB6371

CREATE TABLE j_purchaseorder
   (id  RAW (16) NOT NULL,
date_loaded TIMESTAMP WITH TIME ZONE,
po_document CLOB
CONSTRAINT ensure_json CHECK (po_document IS JSON));

Data that I inserted was -

{ "PONumber" : 1600,
  "Reference": "ABULL-20140421",
  "Requestor": "Alexis Bull",
  "User" : "ABULL",
  "CostCenter"   : "A50",
  "ShippingInstructions" : { "name"   : "Alexis Bull",
 "Address": { "street"  : "200 Sporting Green",
  "city": "South San Francisco",
  "state"   : "CA",
  "zipCode" : 99236,
  "country" : "United States
of America" },
 "Phone" : [ { "type" : "Office", "number"
: "909-555-7307" },
 { "type" : "Mobile", "number"
: "415-555-1234" } ] },
  "Special Instructions" : null,
  "AllowPartialShipment" : false,
  "LineItems": [ { "ItemNumber" : 1,
   "Part"   : { "Description" : "One
Magic Christmas",
"UnitPrice"   : 19.95,
"UPCCode" : 13131092899 },
   "Quantity"   : 9.0 },
 { "ItemNumber" : 2,
   "Part"   : { "Description" : "Lethal Weapon",
"UnitPrice"   : 19.95,
"UPCCode" : 85391628927 },
   "Quantity"   : 5.0 } ] }


On Fri, Jul 21, 2017 at 10:12 AM, Xiao Li  wrote:

> Could you share the schema of your Oracle table and open a JIRA?
>
> Thanks!
>
> Xiao
>
>
> 2017-07-21 9:40 GMT-07:00 Cassa L :
>
>> I am using 2.2.0. I resolved the problem by removing SELECT * and adding
>> column names to the SELECT statement. That works. I'm wondering why SELECT
>> * will not work.
>>
>> Regards,
>> Leena
>>
>> On Fri, Jul 21, 2017 at 8:21 AM, Xiao Li  wrote:
>>
>>> Could you try 2.2? We fixed multiple Oracle related issues in the latest
>>> release.
>>>
>>> Thanks
>>>
>>> Xiao
>>>
>>>
>>> On Wed, 19 Jul 2017 at 11:10 PM Cassa L  wrote:
>>>
 Hi,
 I am trying to use Spark to read from Oracle (12.1) table using Spark
 2.0. My table has JSON data.  I am getting below exception in my code. Any
 clue?

 >
 java.sql.SQLException: Unsupported type -101

 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.o
 rg$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$
 getCatalystType(JdbcUtils.scala:233)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$a
 nonfun$8.apply(JdbcUtils.scala:290)
 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$a
 nonfun$8.apply(JdbcUtils.scala:290)
 at scala.Option.getOrElse(Option.scala:121)
 at

 ==
 My code is very simple.

 SparkSession spark = SparkSession
 .builder()
 .appName("Oracle Example")
 .master("local[4]")
 .getOrCreate();

 final Properties connectionProperties = new Properties();
 connectionProperties.put("user", *"some_user"*));
 connectionProperties.put("password", "some_pwd"));

 final String dbTable =
 "(select *  from  MySampleTable)";

 Dataset jdbcDF = spark.read().jdbc(*URL*, dbTable, 
 connectionProperties);


>>
>


unsuscribe

2017-07-21 Thread Cornelio Iñigo
Please unsuscribe

Thanks

-- 
*Cornelio Iñigo*


Re: Spark Data Frame Writer - Range Partiotioning

2017-07-21 Thread ayan guha
How about creating a partituon column and use it?

On Sat, 22 Jul 2017 at 2:47 am, Jain, Nishit  wrote:

> Is it possible to have Spark Data Frame Writer write based on
> RangePartioning?
>
> For Ex -
>
> I have 10 distinct values for column_a, say 1 to 10.
>
> df.write
> .partitionBy("column_a")
>
> Above code by default will create 10 folders .. column_a=1,column_a=2
> ...column_a=10
>
> I want to see if it is possible to have these partitions based on bucket -
> col_a=1to5, col_a=5-10 .. or something like that? Then also have query
> engine respect it
>
> Thanks,
>
> Nishit
>
-- 
Best Regards,
Ayan Guha


[ Spark SQL ] Conversion from Spark SQL to Avro decimal

2017-07-21 Thread Ernesto Valentino
Hi,
I'm having problems in converting a decimal data from Spark SQL to Avro.
I'm try to write an Avro file with such schema for the decimal field from a
spark application:

{ "name":"num", "type": ["null", {"type": "bytes", "logicalType":
"decimal", "precision": 3, "scale": 1}], "doc":"durata" }

Then, I convert the number from string to BigDecimal, set the scale value
and convert it to a byte array, then writing the avro file.
But when I try to query the data inserted (both with hive and spark SQL) I
have such error:


Caused by: org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Failed to
obtain scale value from file schema: "bytes"
   ...
Caused by: java.lang.NullPointerException

Is the error caused by a wrong serialization of the avro file or what? Have
someone encountered such issue? Have some suggestions? Thanks,

Ernesto


Get full RDD lineage for a spark job

2017-07-21 Thread Ron Gonzalez
Hi,  Can someone point me to a test case or share sample code that is able to 
extract the RDD graph from a Spark job anywhere during its lifecycle? I 
understand that Spark has UI that can show the graph of the execution so I'm 
hoping that is using some API somewhere that I could use.  I know RDD is the 
actual execution graph, so if there is also a more logical abstraction API 
closer to calls like map, filter, aggregate, etc., that would even be better.  
Appreciate any help...
Thanks,Ron

Unsubscribe

2017-07-21 Thread Siddhartha Khaitan
Please unsubscribe me.


Re: Spark 2.0 and Oracle 12.1 error

2017-07-21 Thread Xiao Li
Could you share the schema of your Oracle table and open a JIRA?

Thanks!

Xiao


2017-07-21 9:40 GMT-07:00 Cassa L :

> I am using 2.2.0. I resolved the problem by removing SELECT * and adding
> column names to the SELECT statement. That works. I'm wondering why SELECT
> * will not work.
>
> Regards,
> Leena
>
> On Fri, Jul 21, 2017 at 8:21 AM, Xiao Li  wrote:
>
>> Could you try 2.2? We fixed multiple Oracle related issues in the latest
>> release.
>>
>> Thanks
>>
>> Xiao
>>
>>
>> On Wed, 19 Jul 2017 at 11:10 PM Cassa L  wrote:
>>
>>> Hi,
>>> I am trying to use Spark to read from Oracle (12.1) table using Spark
>>> 2.0. My table has JSON data.  I am getting below exception in my code. Any
>>> clue?
>>>
>>> >
>>> java.sql.SQLException: Unsupported type -101
>>>
>>> at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.
>>> org$apache$spark$sql$execution$datasources$jdbc$
>>> JdbcUtils$$getCatalystType(JdbcUtils.scala:233)
>>> at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$
>>> anonfun$8.apply(JdbcUtils.scala:290)
>>> at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$
>>> anonfun$8.apply(JdbcUtils.scala:290)
>>> at scala.Option.getOrElse(Option.scala:121)
>>> at
>>>
>>> ==
>>> My code is very simple.
>>>
>>> SparkSession spark = SparkSession
>>> .builder()
>>> .appName("Oracle Example")
>>> .master("local[4]")
>>> .getOrCreate();
>>>
>>> final Properties connectionProperties = new Properties();
>>> connectionProperties.put("user", *"some_user"*));
>>> connectionProperties.put("password", "some_pwd"));
>>>
>>> final String dbTable =
>>> "(select *  from  MySampleTable)";
>>>
>>> Dataset jdbcDF = spark.read().jdbc(*URL*, dbTable, 
>>> connectionProperties);
>>>
>>>
>


Spark Data Frame Writer - Range Partiotioning

2017-07-21 Thread Jain, Nishit
Is it possible to have Spark Data Frame Writer write based on RangePartioning?

For Ex -

I have 10 distinct values for column_a, say 1 to 10.

df.write
.partitionBy("column_a")


Above code by default will create 10 folders .. column_a=1,column_a=2 
...column_a=10

I want to see if it is possible to have these partitions based on bucket - 
col_a=1to5, col_a=5-10 .. or something like that? Then also have query engine 
respect it

Thanks,

Nishit


Re: Spark 2.0 and Oracle 12.1 error

2017-07-21 Thread Cassa L
I am using 2.2.0. I resolved the problem by removing SELECT * and adding
column names to the SELECT statement. That works. I'm wondering why SELECT
* will not work.

Regards,
Leena

On Fri, Jul 21, 2017 at 8:21 AM, Xiao Li  wrote:

> Could you try 2.2? We fixed multiple Oracle related issues in the latest
> release.
>
> Thanks
>
> Xiao
>
>
> On Wed, 19 Jul 2017 at 11:10 PM Cassa L  wrote:
>
>> Hi,
>> I am trying to use Spark to read from Oracle (12.1) table using Spark
>> 2.0. My table has JSON data.  I am getting below exception in my code. Any
>> clue?
>>
>> >
>> java.sql.SQLException: Unsupported type -101
>>
>> at org.apache.spark.sql.execution.datasources.jdbc.
>> JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$
>> getCatalystType(JdbcUtils.scala:233)
>> at org.apache.spark.sql.execution.datasources.jdbc.
>> JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:290)
>> at org.apache.spark.sql.execution.datasources.jdbc.
>> JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:290)
>> at scala.Option.getOrElse(Option.scala:121)
>> at
>>
>> ==
>> My code is very simple.
>>
>> SparkSession spark = SparkSession
>> .builder()
>> .appName("Oracle Example")
>> .master("local[4]")
>> .getOrCreate();
>>
>> final Properties connectionProperties = new Properties();
>> connectionProperties.put("user", *"some_user"*));
>> connectionProperties.put("password", "some_pwd"));
>>
>> final String dbTable =
>> "(select *  from  MySampleTable)";
>>
>> Dataset jdbcDF = spark.read().jdbc(*URL*, dbTable, 
>> connectionProperties);
>>
>>


Re: Spark on Cloudera Configuration (Scheduler Mode = FAIR)

2017-07-21 Thread Marcelo Vanzin
On Fri, Jul 21, 2017 at 5:00 AM, Gokula Krishnan D  wrote:
> Is there anyway can we setup the scheduler mode in Spark Cluster level
> besides application (SC level).

That's called the cluster (or resource) manager. e.g., configure
separate queues in YARN with a maximum number of resources for each.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Supporting columns with heterogenous data

2017-07-21 Thread Lalwani, Jayesh
What is a good way to support non-homogenous input data? In structured streaming

Let me explain the use case that we are trying to solve. We are reading data 
from 3 topics in Kafka. All the topics have data in Avro format, with each of 
them having their own schema. Now, all the 3 Avro schemas have 2 columns: 
EventHeader and EventPayload. EventHeader has the same structure in all 3 
topics. However, each topic has a different schema for EventPayload. The 
EventHeader has a field called eventType that dictates what will be structure 
of the EventPayload

Now, what we want to do is do some streaming analytics across all 3 topics. So, 
at a high level, we want to

a)   read the data from all 3 topics from Kafka,

b)   transform into a single streaming Data frame that contains data from 
all 3 topics. This data frame should have a superset of fields from all 3 
schemas

c)   Do some aggregation/windowing on this data frame

d)   Write to a database

The issue is how exactly do we do b) . Let’s say Event Payload in Topic A has 
fields x, y, z, in Topic B has  p, q and r, and in Topic C has x, y and p, we 
need a Dataframe that contains eventID, x, y, z, p, q, r.  So, there are 
different ways we are playing around. Note that one of the goals we are trying 
to achieve is separate the parsing logic to

Option1

1)   Read Avro data from each kafka topic as a byte array into it’s own 
dataframe. So, we have 3 data frames, and all 3 data frames have 1 column with 
binary data

2)   Convert each data frame and parse the avro payload. So, now, we have 3 
data frames, and all 3 have 2 fields EventHeader and EventPayload. The 
StructType of EventPayload in each of the 3 dataframes is dfferrent

3)   Convert the 3 data frames into the standard format. Now, the output 
has 3 data frames all of them with the same structure: EventID, x, y, z, p, q, r

4)   Union these 3 data frames
I am not sure if Union across different streaming data sources will work. Will 
it?

Option 2

1)   Read Avro data from all 3  kafka topic as a byte array into a single  
dataframe. So, we have 1 data frames, with 1 binary column

2)   Convert the data frame and parse the avro payload. Convert 
EventPayload into some format that can store heterogenous data. Diffierrent 
rows have different schema for EventPayload. So, we could convert EventPayload 
into JSON and store it in a String column, or convert it into AVRO and store it 
in a binary column

3)   Write a converter that re parses the EventPayload column and creates a 
data frame with the standard structure

Is there a better way of doing this?



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Spark 2.0 and Oracle 12.1 error

2017-07-21 Thread Xiao Li
Could you try 2.2? We fixed multiple Oracle related issues in the latest
release.

Thanks

Xiao


On Wed, 19 Jul 2017 at 11:10 PM Cassa L  wrote:

> Hi,
> I am trying to use Spark to read from Oracle (12.1) table using Spark 2.0.
> My table has JSON data.  I am getting below exception in my code. Any clue?
>
> >
> java.sql.SQLException: Unsupported type -101
>
> at
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getCatalystType(JdbcUtils.scala:233)
> at
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:290)
> at
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:290)
> at scala.Option.getOrElse(Option.scala:121)
> at
>
> ==
> My code is very simple.
>
> SparkSession spark = SparkSession
> .builder()
> .appName("Oracle Example")
> .master("local[4]")
> .getOrCreate();
>
> final Properties connectionProperties = new Properties();
> connectionProperties.put("user", *"some_user"*));
> connectionProperties.put("password", "some_pwd"));
>
> final String dbTable =
> "(select *  from  MySampleTable)";
>
> Dataset jdbcDF = spark.read().jdbc(*URL*, dbTable, connectionProperties);
>
>


Re: Spark (SQL / Structured Streaming) Cassandra - PreparedStatement

2017-07-21 Thread Russell Spitzer
The scc includes the java driver. Which means you could just use java
driver functions. It also provides a serializable wrapper which has session
and prepared statement pooling. Something like

val cc = CassandraConnector(sc.getConf)
SomeFunctionWithAnIterator{
   it: SomeIterator =>
   cc.withSessionDo { session =>
  val ps = session.prepare("statement")
  it.map( row => session.executeAsync(ps.bind(row))
   }
} // Do something with the futures here

I wrote a blog post about this here
http://www.russellspitzer.com/2017/02/27/Concurrency-In-Spark/#concurrency-with-the-cassandra-java-driver

On Tue, Apr 11, 2017 at 4:05 AM Bastien DINE 
wrote:

> Hi everyone,
>
>
>
> I'm using Spark Structured Streaming for Machine Learning purpose in real
> time, and I want to stored predictions in my Cassandra cluster.
>
>
>
> Since I am in a streaming context, executing multiple times per seconds
> the same request, one mandatory optimization is to use PreparedStatement.
>
>
>
> In the cassandra spark driver (
> https://github.com/datastax/spark-cassandra-connector) there is no way to
> use PreparedStatement (in scala or python, i'm not considering java as a
> option)
>
>
>
> Should i use a scala  (https://github.com/outworkers/phantom) / python (
> https://github.com/datastax/python-driver) cassandra driver ?
>
> How does it work then, my connection object need to be serializable to be
> passed to workers ?
>
>
>
> If anyone can help me !
>
>
>
> Thanks :)
>
> Bastien
>


Re: Spark on Cloudera Configuration (Scheduler Mode = FAIR)

2017-07-21 Thread Gokula Krishnan D
Mark & Ayan, thanks for the inputs.

*Is there anyway can we setup the scheduler mode in Spark Cluster level
besides application (SC level).*

Currently in YARN is in FAIR mode and manually we ensure that Spark
Application also in FAIR mode however noticed that Applications are not
releasing the resources as soon as the tasks are done when we mention
Dynamic allocation = true and did not specify any explicit Executor
allocation.

At this moment, we are specifying the Min and Max Executor allocation at
Spark Application level in order to ensure that all of our ETL Spark
Applications can run parallel without any resource issues.

It would be great if you could throw more insight on the how to set the
preemption within yarn and Spark.

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Thu, Jul 20, 2017 at 6:46 PM, ayan guha  wrote:

> Hi
>
> As Mark said, scheduler mode works within application ie within a Spark
> Session and Spark context. This is also clear if you think where you set
> the configuration - in a Spark Config which used to build a context.
>
> If you are using Yarn as resource manager, however, you can set YARN with
> fair scheduler. If you do so, both of your applications will get "Fair"
> treatment from Yarn, ie get resources in round robin manner. If you want
> your App A to give up resources while using them, you need to set
> preemption within Yarn and priority of applications so that preemption can
> kick in.
>
> HTH...
>
> Best, Ayan
>
> On Fri, Jul 21, 2017 at 7:11 AM, Mark Hamstra 
> wrote:
>
>> The fair scheduler doesn't have anything to do with reallocating resource
>> across Applications.
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#sch
>> eduling-across-applications
>> https://spark.apache.org/docs/latest/job-scheduling.html#sch
>> eduling-within-an-application
>>
>> On Thu, Jul 20, 2017 at 2:02 PM, Gokula Krishnan D 
>> wrote:
>>
>>> Mark, Thanks for the response.
>>>
>>> Let me rephrase my statements.
>>>
>>> "I am submitting a Spark application(*Application*#A) with
>>> scheduler.mode as FAIR and dynamicallocation=true and it got all the
>>> available executors.
>>>
>>> In the meantime, submitting another Spark Application (*Application*
>>> # B) with the scheduler.mode as FAIR and dynamicallocation=true but it got
>>> only one executor. "
>>>
>>> Thanks & Regards,
>>> Gokula Krishnan* (Gokul)*
>>>
>>> On Thu, Jul 20, 2017 at 4:56 PM, Mark Hamstra 
>>> wrote:
>>>
 First, Executors are not allocated to Jobs, but rather to Applications.
 If you run multiple Jobs within a single Application, then each of the
 Tasks associated with Stages of those Jobs has the potential to run on any
 of the Application's Executors. Second, once a Task starts running on an
 Executor, it has to complete before another Task can be scheduled using the
 prior Task's resources -- the fair scheduler is not preemptive of running
 Tasks.

 On Thu, Jul 20, 2017 at 1:45 PM, Gokula Krishnan D  wrote:

> Hello All,
>
> We are having cluster with 50 Executors each with 4 Cores so can avail
> max. 200 Executors.
>
> I am submitting a Spark application(JOB A) with scheduler.mode as FAIR
> and dynamicallocation=true and it got all the available executors.
>
> In the meantime, submitting another Spark Application (JOB B) with the
> scheduler.mode as FAIR and dynamicallocation=true but it got only one
> executor.
>
> Normally this situation occurs when any of the JOB runs with the
> Scheduler.mode= FIFO.
>
> 1) Have your ever faced this issue if so how to overcome this?.
>
> I was in the impression that as soon as I submit the JOB B the Spark
> Scheduler should distribute/release few resources from the JOB A and share
> it with the JOB A in the Round Robin fashion?.
>
> Appreciate your response !!!.
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>


>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>