Re: Question on using pseudo columns in spark jdbc options

2017-12-07 Thread रविशंकर नायर
It works perfectly. You can use pseudo columns like ROWNUM in Oracle and
RRN in DB2. To avoid skewing you can apply the great coalesce
function...Spark is sparkling..

Best,


On Thu, Dec 7, 2017 at 2:20 PM, Tomasz Dudek 
wrote:

> Hey Ravion,
>
> yes, you can obviously specify other column than a primary key. Be aware
> though, that if the key range is not spread evenly (for example in your
> code, if there's a "gap" in primary keys and no row has id between 0 and
> 17220) some of the executors may not assist in loading data (because
> "SELECT * FROM orders WHERE order_id IS BETWEEN 0 AND 17220 will return an
> empty result). I think you might want to repartition afterwards to ensure
> that df is evenly distributed(<--- could somebody confirm my last sentence?
> I don't want to mislead and I am not sure).
>
> The first question - could you just check and provide us the answer? :)
>
> Cheers,
> Tomasz
>
> 2017-12-03 7:39 GMT+01:00 ☼ R Nair (रविशंकर नायर) <
> ravishankar.n...@gmail.com>:
>
>> Hi all,
>>
>> I am using a query to fetch data from MYSQL as follows:
>>
>> var df = spark.read.
>> format("jdbc").
>> option("url", "jdbc:mysql://10.0.0.192:3306/retail_db").
>> option("driver" ,"com.mysql.jdbc.Driver").
>> option("user", "retail_dba").
>> option("password", "cloudera").
>> option("dbtable", "orders").
>> option("partitionColumn", "order_id").
>> option("lowerBound", "1").
>> option("upperBound", "68883").
>> option("numPartitions", "4").
>> load()
>>
>> Question is, can I use a pseudo column (like ROWNUM in Oracle or
>> RRN(employeeno) in DB2) in option where I specify the "partitionColumn" ?
>> If not, can we specify a partition column which is not a primary key ?
>>
>> Best,
>> Ravion
>>
>>
>>
>>
>


--


Re: Row Encoder For DataSet

2017-12-07 Thread Georg Heiler
You are looking for an UADF.
Sandip Mehta  schrieb am Fr. 8. Dez. 2017 um
06:20:

> Hi,
>
> I want to group on certain columns and then for every group wants to apply
> custom UDF function to it. Currently groupBy only allows to add aggregation
> function to GroupData.
>
> For this was thinking to use groupByKey which will return KeyValueDataSet
> and then apply UDF for every group but really not been able solve this.
>
> SM
>
> On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu 
> wrote:
>
>> You can groupBy multiple columns on dataframe, so why you need so
>> complicated schema ?
>>
>> suppose df schema: (x, y, u, v, z)
>>
>> df.groupBy($"x", $"y").agg(...)
>>
>> Is this you want ?
>>
>> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta > > wrote:
>>
>>> Hi,
>>>
>>> During my aggregation I end up having following schema.
>>>
>>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>>
>>> val values = Seq(
>>> (Row(10, 11), Row(10, 2, 11)),
>>> (Row(10, 11), Row(10, 2, 11)),
>>> (Row(20, 11), Row(10, 2, 11))
>>>   )
>>>
>>>
>>> 1st tuple is used to group the relevant records for aggregation. I have
>>> used following to create dataset.
>>>
>>> val s = StructType(Seq(
>>>   StructField("x", IntegerType, true),
>>>   StructField("y", IntegerType, true)
>>> ))
>>> val s1 = StructType(Seq(
>>>   StructField("u", IntegerType, true),
>>>   StructField("v", IntegerType, true),
>>>   StructField("z", IntegerType, true)
>>> ))
>>>
>>> val ds = 
>>> sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
>>>  RowEncoder(s1)))
>>>
>>> Is this correct way of representing this?
>>>
>>> How do I create dataset and row encoder for such use case for doing
>>> groupByKey on this?
>>>
>>>
>>>
>>> Regards
>>> Sandeep
>>>
>>
>>


Re: Row Encoder For DataSet

2017-12-07 Thread Sandip Mehta
Hi,

I want to group on certain columns and then for every group wants to apply
custom UDF function to it. Currently groupBy only allows to add aggregation
function to GroupData.

For this was thinking to use groupByKey which will return KeyValueDataSet
and then apply UDF for every group but really not been able solve this.

SM

On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu 
wrote:

> You can groupBy multiple columns on dataframe, so why you need so
> complicated schema ?
>
> suppose df schema: (x, y, u, v, z)
>
> df.groupBy($"x", $"y").agg(...)
>
> Is this you want ?
>
> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta 
> wrote:
>
>> Hi,
>>
>> During my aggregation I end up having following schema.
>>
>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>
>> val values = Seq(
>> (Row(10, 11), Row(10, 2, 11)),
>> (Row(10, 11), Row(10, 2, 11)),
>> (Row(20, 11), Row(10, 2, 11))
>>   )
>>
>>
>> 1st tuple is used to group the relevant records for aggregation. I have
>> used following to create dataset.
>>
>> val s = StructType(Seq(
>>   StructField("x", IntegerType, true),
>>   StructField("y", IntegerType, true)
>> ))
>> val s1 = StructType(Seq(
>>   StructField("u", IntegerType, true),
>>   StructField("v", IntegerType, true),
>>   StructField("z", IntegerType, true)
>> ))
>>
>> val ds = 
>> sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
>>  RowEncoder(s1)))
>>
>> Is this correct way of representing this?
>>
>> How do I create dataset and row encoder for such use case for doing
>> groupByKey on this?
>>
>>
>>
>> Regards
>> Sandeep
>>
>
>


[Spark SQL]: Dataset can not map into Dataset in java

2017-12-07 Thread Himasha de Silva
Hi,

 I'm trying to map a Dataset that read from csv files into a
Dataset.
But it gives some errors. Can anyone please help me to figure it out?

Dataset t_en_data = session.read().option("header","true")
.option("inferSchema","true")
.csv("J:\\csv_path\\T_EN");

Dataset mappedDataset = t_en_data
.map((MapFunction) row
->row.getAs("Duration"), Encoders.INT());
mappedDataset.show(false);


error - https://gist.github.com/anonymous/614a1746162dd12bb0b810add85deda2

Thanks
-- 

Himasha De Silva


Re: Row Encoder For DataSet

2017-12-07 Thread Weichen Xu
You can groupBy multiple columns on dataframe, so why you need so
complicated schema ?

suppose df schema: (x, y, u, v, z)

df.groupBy($"x", $"y").agg(...)

Is this you want ?

On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta 
wrote:

> Hi,
>
> During my aggregation I end up having following schema.
>
> Row(Row(val1,val2), Row(val1,val2,val3...))
>
> val values = Seq(
> (Row(10, 11), Row(10, 2, 11)),
> (Row(10, 11), Row(10, 2, 11)),
> (Row(20, 11), Row(10, 2, 11))
>   )
>
>
> 1st tuple is used to group the relevant records for aggregation. I have
> used following to create dataset.
>
> val s = StructType(Seq(
>   StructField("x", IntegerType, true),
>   StructField("y", IntegerType, true)
> ))
> val s1 = StructType(Seq(
>   StructField("u", IntegerType, true),
>   StructField("v", IntegerType, true),
>   StructField("z", IntegerType, true)
> ))
>
> val ds = 
> sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
>  RowEncoder(s1)))
>
> Is this correct way of representing this?
>
> How do I create dataset and row encoder for such use case for doing
> groupByKey on this?
>
>
>
> Regards
> Sandeep
>


[Spark SQL]: Dataset can not map into Dataset in java

2017-12-07 Thread Himasha de Silva
Hi,
I'm new to spark. I'm trying to map a Dataset that read from csv files
into a Dataset.
But it gives some errors. Can anyone please help me to figure it out?

my code, csv file and error log attached here.

Thank you.
-- 

Himasha De Silva

Undergraduate,

Department of Computer Engineering,

University of Peradeniya.

Direct: +9477 552 4152 <+94%2077%20552%204152>

Email: himasha...@gmail.com
17/12/08 09:46:47 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4, 
172.31.48.232, executor 0): java.lang.ClassCastException: cannot assign 
instance of scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

17/12/08 09:46:47 INFO TaskSetManager: Starting task 0.1 in stage 2.0 (TID 5, 
172.31.48.232, executor 0, partition 0, PROCESS_LOCAL, 5430 bytes)
17/12/08 09:46:47 INFO TaskSetManager: Lost task 0.1 in stage 2.0 (TID 5) on 
172.31.48.232, executor 0: java.lang.ClassCastException (cannot assign instance 
of scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) 
[duplicate 1]
17/12/08 09:46:47 INFO TaskSetManager: Starting task 0.2 in stage 2.0 (TID 6, 
172.31.48.232, executor 0, partition 0, PROCESS_LOCAL, 5430 bytes)
17/12/08 09:46:47 INFO TaskSetManager: Lost task 0.2 in stage 2.0 (TID 6) on 
172.31.48.232, executor 0: java.lang.ClassCastException (cannot assign instance 
of scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) 
[duplicate 2]
17/12/08 09:46:47 INFO TaskSetManager: Starting task 0.3 in stage 2.0 (TID 7, 
172.31.48.232, executor 0, partition 0, PROCESS_LOCAL, 5430 bytes)
17/12/08 09:46:47 INFO TaskSetManager: Lost task 0.3 in stage 2.0 (TID 7) on 
172.31.48.232, executor 0: java.lang.ClassCastException (cannot assign instance 
of scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) 
[duplicate 3]
17/12/08 09:46:47 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; 
aborting job
17/12/08 09:46:47 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have 
all completed, from pool 
17/12/08 09:46:47 INFO TaskSchedulerImpl: Cancelling stage 2
17/12/08 09:46:47 INFO DAGScheduler: ResultStage 2 (show at 
BatchProcess.java:38) failed in 0.450 s due to Job aborted due to stage 
failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 2.0 (TID 7, 172.31.48.232, executor 0): java.lang.ClassCastException: 
cannot assign instance of scala.collection.immutable.List$SerializationProxy to 
field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java

RDD[internalRow] -> DataSet

2017-12-07 Thread satyajit vegesna
Hi All,

Is there a way to convert RDD[internalRow] to Dataset , from outside spark
sql package.

Regards,
Satyajit.


Row Encoder For DataSet

2017-12-07 Thread Sandip Mehta
Hi,

During my aggregation I end up having following schema.

Row(Row(val1,val2), Row(val1,val2,val3...))

val values = Seq(
(Row(10, 11), Row(10, 2, 11)),
(Row(10, 11), Row(10, 2, 11)),
(Row(20, 11), Row(10, 2, 11))
  )


1st tuple is used to group the relevant records for aggregation. I have
used following to create dataset.

val s = StructType(Seq(
  StructField("x", IntegerType, true),
  StructField("y", IntegerType, true)
))
val s1 = StructType(Seq(
  StructField("u", IntegerType, true),
  StructField("v", IntegerType, true),
  StructField("z", IntegerType, true)
))

val ds = 
sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
RowEncoder(s1)))

Is this correct way of representing this?

How do I create dataset and row encoder for such use case for doing
groupByKey on this?



Regards
Sandeep


Re: Spark job only starts tasks on a single node

2017-12-07 Thread Ji Yan
This used to work. Only thing that has changed is that the mesos installed
on Spark executor is on a different version from before. My Spark executor
runs in a container, the image of which has mesos installed. The version of
that mesos is actually different from the version of mesos master. Not sure
if that is the problem though. I am trying to bring back the old version
mesos to Spark executor image. Did anyone know that mesos slave and master
not running the same version could lead to this problem?

On Thu, Dec 7, 2017 at 11:34 AM, Art Rand  wrote:

> Sounds a little like the driver got one offer when it was using zero
> resources, then it's not getting any more. How many frameworks (and which)
> are running on the cluster? The Mesos Master log should say which
> frameworks are getting offers, and should help diagnose the problem.
>
> A
>
> On Thu, Dec 7, 2017 at 10:18 AM, Susan X. Huynh 
> wrote:
>
>> Sounds strange. Maybe it has to do with the job itself? What kind of job
>> is it? Have you gotten it to run on more than one node before? What's in
>> the spark-submit command?
>>
>> Susan
>>
>> On Wed, Dec 6, 2017 at 11:21 AM, Ji Yan  wrote:
>>
>>> I am sure that the other agents have plentiful enough resources, but I
>>> don't know why Spark only scheduled executors on one single node, up to
>>> that node's capacity ( it is a different node everytime I run btw ).
>>>
>>> I checked the DEBUG log from Spark Driver, didn't see any mention of
>>> decline. But from log, it looks like it has only accepted one offer from
>>> Mesos.
>>>
>>> Also looks like there is no special role required on Spark part!
>>>
>>> On Wed, Dec 6, 2017 at 5:57 AM, Art Rand  wrote:
>>>
 Hello Ji,

 Spark will launch Executors round-robin on offers, so when the
 resources on an agent get broken into multiple resource offers it's
 possible that many Executrors get placed on a single agent. However, from
 your description, it's not clear why your other agents do not get Executors
 scheduled on them. It's possible that the offers from your other agents are
 insufficient in some way. The Mesos MASTER log should show offers being
 declined by your Spark Driver, do you see that?  If you have DEBUG level
 logging in your Spark driver you should also see offers being declined
 
 there. Finally if your Spark framework isn't receiving any resource offers,
 it could be because of the roles you have established on your agents or
 quota set on other frameworks, have you set up any of that? Hope this 
 helps!

 Art

 On Tue, Dec 5, 2017 at 10:45 PM, Ji Yan  wrote:

> Hi all,
>
> I am running Spark 2.0 on Mesos 1.1. I was trying to split up my job
> onto several nodes. I try to set the number of executors by the formula
> (spark.cores.max / spark.executor.cores). The behavior I saw was that 
> Spark
> will try to fill up on one mesos node as many executors as it can, then it
> stops going to other mesos nodes despite that it has not done scheduling
> all the executors I have asked it to yet! This is super weird!
>
> Did anyone notice this behavior before? Any help appreciated!
>
> Ji
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>


>>>
>>> The information in this email is confidential and may be legally
>>> privileged. It is intended solely for the addressee. Access to this email
>>> by anyone else is unauthorized. If you are not the intended recipient, any
>>> disclosure, copying, distribution or any action taken or omitted to be
>>> taken in reliance on it, is prohibited and may be unlawful.
>>>
>>
>>
>>
>> --
>> Susan X. Huynh
>> Software engineer, Data Agility
>> xhu...@mesosphere.com
>>
>
>

-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.


Re: Programmatically get status of job (WAITING/RUNNING)

2017-12-07 Thread Qiao, Richard
For your question of example, the answer is yes.
“For example, if an application wanted 4 executors
(spark.executor.instances=4) but the spark cluster can only provide 1
executor. This means that I will only receive 1 onExecutorAdded event. Will
the application state change to RUNNING (even if 1 executor was allocated)?
“

Best Regards
Richard


On 12/7/17, 2:40 PM, "bsikander"  wrote:

Marcelo Vanzin wrote
> I'm not sure I follow you here. This is something that you are
> defining, not Spark.

Yes, you are right. In my code, 
1) my notion of RUNNING is that both driver + executors are in RUNNING
state.
2) my notion of WAITING is if any one of driver/executor is in WAITING
state.

So,
- SparkLauncher provides me the details about the "driver".
RUNNING/SUBMITTED/WAITING
- SparkListener provides me the details about the "executor" using
onExecutorAdded/onExecutorDeleted

I want to combine both SparkLauncher + SparkListener to achieve my view of
RUNNING/WAITING.

The only thing confusing me here is that I don't know how Spark internally
converts applications from WAITING to RUNNING state.
For example, if an application wanted 4 executors
(spark.executor.instances=4) but the spark cluster can only provide 1
executor. This means that I will only receive 1 onExecutorAdded event. Will
the application state change to RUNNING (even if 1 executor was allocated)?

If I am clear on this logic I can implement my feature.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Programmatically get status of job (WAITING/RUNNING)

2017-12-07 Thread Qiao, Richard
For #2, do you mean “RUNNING” showing in “Driver” table? If yes, that is not a 
problem, because driver does run, while there is no executor available, as can 
be a status for you to catch – Driver running while no executors.
Comparing #1 and #3, my understanding of “submitted” is “the jar is submitted 
to executors”. With this concept, you may define your own status.

Best Regards
Richard


On 12/4/17, 4:06 AM, "bsikander"  wrote:

So, I tried to use SparkAppHandle.Listener with SparkLauncher as you
suggested. The behavior of Launcher is not what I expected.

1- If I start the job (using SparkLauncher) and my Spark cluster has enough
cores available, I receive events in my class extending
SparkAppHandle.Listener and I see the status getting changed from
UNKOWN->CONNECTED -> SUBMITTED -> RUNNING. All good here.

2- If my Spark cluster has cores only for my Driver process (running in
cluster mode) but no cores for my executor, then I still receive the RUNNING
event. I was expecting something else since my executor has no cores and
Master UI shows WAITING state for executors, listener should respond with
SUBMITTED state instead of RUNNING.

3- If my Spark cluster has no cores for even the driver process then
SparkLauncher invokes no events at all. The state stays in UNKNOWN. I would
have expected it to be in SUBMITTED state atleast.

*Is there any way with which I can reliably get the WAITING state of job?*
Driver=RUNNING, executor=RUNNING, overall state should be RUNNING
Driver=RUNNING, executor=WAITING overall state should be SUBMITTED/WAITING
Driver=WAITING, executor=WAITING overall state should be
CONNECTED/SUBMITTED/WAITING







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Programmatically get status of job (WAITING/RUNNING)

2017-12-07 Thread Marcelo Vanzin
That's the Spark Master's view of the application. I don't know
exactly what it means in the different run modes, I'm more familiar
with YARN. But I wouldn't be surprised if, as with others, it mostly
tracks the driver's state.

On Thu, Dec 7, 2017 at 12:06 PM, bsikander  wrote:
> 
>
> See the image. I am referring to this state when I say "Application State".
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Programmatically get status of job (WAITING/RUNNING)

2017-12-07 Thread bsikander

 

See the image. I am referring to this state when I say "Application State".



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: sparkSession.sql("sql query") vs df.sqlContext().sql(this.query) ?

2017-12-07 Thread khathiravan raj maadhaven
Hi Kant,

Based on my understanding, I think the only difference is the overhead of
the selection/creation of SqlContext for the query you have passed. As the
table / view is already available for use, sparkSession.sql('your query')
should be simple & good enough.

Following uses the session/context by default created and available:

* sparkSession.sql(**"select value from table")*

while the following would look for create one & run the query (which I
believe is extra overhead):
*df.sqlContext().sql(**"select value from table")*

Regards
Raj



On Wed, Dec 6, 2017 at 6:07 PM, kant kodali  wrote:

> Hi All,
>
> I have the following snippets of the code and I wonder what is the
> difference between these two and which one should I use? I am using spark
> 2.2.
>
> Dataset df = sparkSession.readStream()
> .format("kafka")
> .load();
>
> df.createOrReplaceTempView("table");
> df.printSchema();
>
> *Dataset resultSet =  df.sqlContext().sql(*
> *"select value from table"); //sparkSession.sql(this.query);*StreamingQuery 
> streamingQuery = resultSet
> .writeStream()
> .trigger(Trigger.ProcessingTime(1000))
> .format("console")
> .start();
>
>
> vs
>
>
> Dataset df = sparkSession.readStream()
> .format("kafka")
> .load();
>
> df.createOrReplaceTempView("table");
>
> *Dataset resultSet =  sparkSession.sql(*
> *"select value from table"); //sparkSession.sql(this.query);*StreamingQuery 
> streamingQuery = resultSet
> .writeStream()
> .trigger(Trigger.ProcessingTime(1000))
> .format("console")
> .start();
>
>
> Thanks!
>
>


Best way of shipping self-contained pyspark jobs with 3rd-party dependencies

2017-12-07 Thread Sergey Zhemzhitsky
Hi PySparkers,

What currently is the best way of shipping self-contained pyspark jobs with
3rd-party dependencies?
There are some open JIRA issues [1], [2] as well as corresponding PRs [3],
[4] and articles [5], [6], regarding setting up the python environment with
conda and virtualenv respectively.

So I'm wondering what the community does in cases, when it's necessary to
- prevent python package/module version conflicts between different jobs
- prevent updating all the nodes of the cluster in case of new job
dependencies
- track which dependencies are introduced on the per-job basis


[1] https://issues.apache.org/jira/browse/SPARK-13587
[2] https://issues.apache.org/jira/browse/SPARK-16367
[3] https://github.com/apache/spark/pull/13599
[4] https://github.com/apache/spark/pull/14180
[5] https://www.anaconda.com/blog/developer-blog/conda-spark/
[6]
http://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv/


Re: Programmatically get status of job (WAITING/RUNNING)

2017-12-07 Thread Marcelo Vanzin
On Thu, Dec 7, 2017 at 11:40 AM, bsikander  wrote:
> For example, if an application wanted 4 executors
> (spark.executor.instances=4) but the spark cluster can only provide 1
> executor. This means that I will only receive 1 onExecutorAdded event. Will
> the application state change to RUNNING (even if 1 executor was allocated)?

What application state are you talking about? That's the thing that
you seem to be confused about here.

As you've already learned, SparkLauncher only cares about the driver.
So RUNNING means the driver is running.

And there's no concept of running anywhere else I know of that is
exposed to Spark applications. So I don't know which code you're
referring to when you say "the application state change to RUNNING".

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark job only starts tasks on a single node

2017-12-07 Thread Art Rand
Sounds a little like the driver got one offer when it was using zero
resources, then it's not getting any more. How many frameworks (and which)
are running on the cluster? The Mesos Master log should say which
frameworks are getting offers, and should help diagnose the problem.

A

On Thu, Dec 7, 2017 at 10:18 AM, Susan X. Huynh 
wrote:

> Sounds strange. Maybe it has to do with the job itself? What kind of job
> is it? Have you gotten it to run on more than one node before? What's in
> the spark-submit command?
>
> Susan
>
> On Wed, Dec 6, 2017 at 11:21 AM, Ji Yan  wrote:
>
>> I am sure that the other agents have plentiful enough resources, but I
>> don't know why Spark only scheduled executors on one single node, up to
>> that node's capacity ( it is a different node everytime I run btw ).
>>
>> I checked the DEBUG log from Spark Driver, didn't see any mention of
>> decline. But from log, it looks like it has only accepted one offer from
>> Mesos.
>>
>> Also looks like there is no special role required on Spark part!
>>
>> On Wed, Dec 6, 2017 at 5:57 AM, Art Rand  wrote:
>>
>>> Hello Ji,
>>>
>>> Spark will launch Executors round-robin on offers, so when the resources
>>> on an agent get broken into multiple resource offers it's possible that
>>> many Executrors get placed on a single agent. However, from your
>>> description, it's not clear why your other agents do not get Executors
>>> scheduled on them. It's possible that the offers from your other agents are
>>> insufficient in some way. The Mesos MASTER log should show offers being
>>> declined by your Spark Driver, do you see that?  If you have DEBUG level
>>> logging in your Spark driver you should also see offers being declined
>>> 
>>> there. Finally if your Spark framework isn't receiving any resource offers,
>>> it could be because of the roles you have established on your agents or
>>> quota set on other frameworks, have you set up any of that? Hope this helps!
>>>
>>> Art
>>>
>>> On Tue, Dec 5, 2017 at 10:45 PM, Ji Yan  wrote:
>>>
 Hi all,

 I am running Spark 2.0 on Mesos 1.1. I was trying to split up my job
 onto several nodes. I try to set the number of executors by the formula
 (spark.cores.max / spark.executor.cores). The behavior I saw was that Spark
 will try to fill up on one mesos node as many executors as it can, then it
 stops going to other mesos nodes despite that it has not done scheduling
 all the executors I have asked it to yet! This is super weird!

 Did anyone notice this behavior before? Any help appreciated!

 Ji

 The information in this email is confidential and may be legally
 privileged. It is intended solely for the addressee. Access to this email
 by anyone else is unauthorized. If you are not the intended recipient, any
 disclosure, copying, distribution or any action taken or omitted to be
 taken in reliance on it, is prohibited and may be unlawful.

>>>
>>>
>>
>> The information in this email is confidential and may be legally
>> privileged. It is intended solely for the addressee. Access to this email
>> by anyone else is unauthorized. If you are not the intended recipient, any
>> disclosure, copying, distribution or any action taken or omitted to be
>> taken in reliance on it, is prohibited and may be unlawful.
>>
>
>
>
> --
> Susan X. Huynh
> Software engineer, Data Agility
> xhu...@mesosphere.com
>


Re: Programmatically get status of job (WAITING/RUNNING)

2017-12-07 Thread bsikander
Marcelo Vanzin wrote
> I'm not sure I follow you here. This is something that you are
> defining, not Spark.

Yes, you are right. In my code, 
1) my notion of RUNNING is that both driver + executors are in RUNNING
state.
2) my notion of WAITING is if any one of driver/executor is in WAITING
state.

So,
- SparkLauncher provides me the details about the "driver".
RUNNING/SUBMITTED/WAITING
- SparkListener provides me the details about the "executor" using
onExecutorAdded/onExecutorDeleted

I want to combine both SparkLauncher + SparkListener to achieve my view of
RUNNING/WAITING.

The only thing confusing me here is that I don't know how Spark internally
converts applications from WAITING to RUNNING state.
For example, if an application wanted 4 executors
(spark.executor.instances=4) but the spark cluster can only provide 1
executor. This means that I will only receive 1 onExecutorAdded event. Will
the application state change to RUNNING (even if 1 executor was allocated)?

If I am clear on this logic I can implement my feature.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question on using pseudo columns in spark jdbc options

2017-12-07 Thread Tomasz Dudek
Hey Ravion,

yes, you can obviously specify other column than a primary key. Be aware
though, that if the key range is not spread evenly (for example in your
code, if there's a "gap" in primary keys and no row has id between 0 and
17220) some of the executors may not assist in loading data (because
"SELECT * FROM orders WHERE order_id IS BETWEEN 0 AND 17220 will return an
empty result). I think you might want to repartition afterwards to ensure
that df is evenly distributed(<--- could somebody confirm my last sentence?
I don't want to mislead and I am not sure).

The first question - could you just check and provide us the answer? :)

Cheers,
Tomasz

2017-12-03 7:39 GMT+01:00 ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com>:

> Hi all,
>
> I am using a query to fetch data from MYSQL as follows:
>
> var df = spark.read.
> format("jdbc").
> option("url", "jdbc:mysql://10.0.0.192:3306/retail_db").
> option("driver" ,"com.mysql.jdbc.Driver").
> option("user", "retail_dba").
> option("password", "cloudera").
> option("dbtable", "orders").
> option("partitionColumn", "order_id").
> option("lowerBound", "1").
> option("upperBound", "68883").
> option("numPartitions", "4").
> load()
>
> Question is, can I use a pseudo column (like ROWNUM in Oracle or
> RRN(employeeno) in DB2) in option where I specify the "partitionColumn" ?
> If not, can we specify a partition column which is not a primary key ?
>
> Best,
> Ravion
>
>
>
>


Re: Streaming Analytics/BI tool to connect Spark SQL

2017-12-07 Thread Pierce Lamb
Hi Umar,

While this answer is a bit dated, you make find it useful in diagnosing a
store for Spark SQL tables:

https://stackoverflow.com/a/39753976/3723346

I don't know much about Pentaho or Arcadia, but I assume many of the listed
options have a JDBC or ODBC client.

Hope this helps,

Pierce

On Thu, Dec 7, 2017 at 10:27 AM, umargeek 
wrote:

> Hi All,
>
> We are currently looking for real-time streaming analytics of data stored
> as
> Spark SQL tables is there any external connectivity available to connect
> with BI tools(Pentaho/Arcadia).
>
> currently, we are storing data into the hive tables but its response on the
> Arcadia dashboard is slow.
>
> Looking for suggestions whether to move out from hive or any connectivity
> for Spark SQL or to Ignite?
>
> Thanks,
> Umar
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-07 Thread Qiao, Richard
Kant, right, we cannot use Driver’s producer in executor. That’s I mentioned 
“kafka sink” to solve it.
This article should be helpful about it 
https://allegro.tech/2015/08/spark-kafka-integration.html

Best Regards
Richard


From: kant kodali 
Date: Thursday, December 7, 2017 at 12:39 PM
To: "Qiao, Richard" 
Cc: Gerard Maas , "user @spark" 
Subject: Re: Do I need to do .collect inside forEachRDD

Hi Richard,

I had tried your sample code now and several times in the past as well. The 
problem seems to be kafkaProducer is not serializable. so I get "Task not 
serializable exception" and my kafkaProducer object is created using the 
following jar.

group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1'

On Thu, Dec 7, 2017 at 2:46 AM, Qiao, Richard 
mailto:richard.q...@capitalone.com>> wrote:
Thanks for sharing the code.
The 1st problem in the first code is the map is allocated in Driver, but it’s 
trying to put data in Executors, then retrieve it in driver to send to Kafka.
You are using this map as accumulator’s feature, but it doesn’t work in this 
way.

The 2nd problem is both codes are trying to collect rdd level data to generate 
a single Json string then send to Kafka, which could cause very long json 
string if your TPS is very high.
If possible you can send smaller json strings at task level, such as:
.foreachRDD(stringLongJavaPairRDD -> {
  stringLongJavaPairRDD.foreachPartition{partition ->{
  Map map = new HashMap<>(); //Defined in a task
  partition.foreach(stringLongTuple2 -> {
map.put(stringLongTuple2._1(), stringLongTuple2._2())
  });
  producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); // 
send smaller json in a task
}
  }
});
When you do it, make sure kafka producer (seek kafka sink for it) and gson’s 
environment setup correctly in executors.

If after this, there is still OOM, let’s discuss further.

Best Regards
Richard


From: kant kodali mailto:kanth...@gmail.com>>
Date: Thursday, December 7, 2017 at 2:30 AM
To: Gerard Maas mailto:gerard.m...@gmail.com>>
Cc: "Qiao, Richard" 
mailto:richard.q...@capitalone.com>>, "user 
@spark" mailto:user@spark.apache.org>>
Subject: Re: Do I need to do .collect inside forEachRDD

@Richard I had pasted the two versions of the code below and I still couldn't 
figure out why it wouldn't work without .collect ?  Any help would be great


The code below doesn't work and sometime I also run into OutOfMemory error.

jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {
Map map = new HashMap<>();
stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

System.out.println(stringLongTuple2._1()); // Works I can see 
values getting printed out

System.out.println(stringLongTuple2._2()); // Works I can see 
values getting printed out

map.put(stringLongTuple2._1(), stringLongTuple2._2())

});

System.out.println(gson.toJson(map)); // Prints empty json doc string 
"{}" always. But why? especially

// when the map is getting filled values as confirmed by the print 
statements above

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
});

jssc.start();

jssc.awaitTermination();



  VS

The below code works but it is slow because .collectAsMap



jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {

LinkedHashMap map = new 
LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));

});

jssc.start();

jssc.awaitTermination();





On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas 
mailto:gerard.m...@gmail.com>> wrote:
Hi Kant,

>  but would your answer on .collect() change depending on running the spark 
> app in client vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali 
mailto:kanth...@gmail.com>> wrote:
@Richard I don't see any error in the executor log but let me ru

Re: Spark job only starts tasks on a single node

2017-12-07 Thread Susan X. Huynh
Sounds strange. Maybe it has to do with the job itself? What kind of job is
it? Have you gotten it to run on more than one node before? What's in the
spark-submit command?

Susan

On Wed, Dec 6, 2017 at 11:21 AM, Ji Yan  wrote:

> I am sure that the other agents have plentiful enough resources, but I
> don't know why Spark only scheduled executors on one single node, up to
> that node's capacity ( it is a different node everytime I run btw ).
>
> I checked the DEBUG log from Spark Driver, didn't see any mention of
> decline. But from log, it looks like it has only accepted one offer from
> Mesos.
>
> Also looks like there is no special role required on Spark part!
>
> On Wed, Dec 6, 2017 at 5:57 AM, Art Rand  wrote:
>
>> Hello Ji,
>>
>> Spark will launch Executors round-robin on offers, so when the resources
>> on an agent get broken into multiple resource offers it's possible that
>> many Executrors get placed on a single agent. However, from your
>> description, it's not clear why your other agents do not get Executors
>> scheduled on them. It's possible that the offers from your other agents are
>> insufficient in some way. The Mesos MASTER log should show offers being
>> declined by your Spark Driver, do you see that?  If you have DEBUG level
>> logging in your Spark driver you should also see offers being declined
>> 
>> there. Finally if your Spark framework isn't receiving any resource offers,
>> it could be because of the roles you have established on your agents or
>> quota set on other frameworks, have you set up any of that? Hope this helps!
>>
>> Art
>>
>> On Tue, Dec 5, 2017 at 10:45 PM, Ji Yan  wrote:
>>
>>> Hi all,
>>>
>>> I am running Spark 2.0 on Mesos 1.1. I was trying to split up my job
>>> onto several nodes. I try to set the number of executors by the formula
>>> (spark.cores.max / spark.executor.cores). The behavior I saw was that Spark
>>> will try to fill up on one mesos node as many executors as it can, then it
>>> stops going to other mesos nodes despite that it has not done scheduling
>>> all the executors I have asked it to yet! This is super weird!
>>>
>>> Did anyone notice this behavior before? Any help appreciated!
>>>
>>> Ji
>>>
>>> The information in this email is confidential and may be legally
>>> privileged. It is intended solely for the addressee. Access to this email
>>> by anyone else is unauthorized. If you are not the intended recipient, any
>>> disclosure, copying, distribution or any action taken or omitted to be
>>> taken in reliance on it, is prohibited and may be unlawful.
>>>
>>
>>
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>



-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Streaming Analytics/BI tool to connect Spark SQL

2017-12-07 Thread umargeek
Hi All,

We are currently looking for real-time streaming analytics of data stored as
Spark SQL tables is there any external connectivity available to connect
with BI tools(Pentaho/Arcadia).

currently, we are storing data into the hive tables but its response on the
Arcadia dashboard is slow. 

Looking for suggestions whether to move out from hive or any connectivity
for Spark SQL or to Ignite?

Thanks,
Umar



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to write dataframe to kafka topic in spark streaming application using pyspark other than collect?

2017-12-07 Thread umargeek
Hi Team,

Can someone please advise me on the above post since because of this I have
written data file to HDFS location. 
So as of now am just passing the filename into Kafka topic and not utilizing
Kafka potential at the best looking forward to suggestions.

Thanks,
Umar



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Do I need to do .collect inside forEachRDD

2017-12-07 Thread kant kodali
Hi Richard,

I had tried your sample code now and several times in the past as well. The
problem seems to be kafkaProducer is not serializable. so I get "Task not
serializable exception" and my kafkaProducer object is created using the
following jar.

group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1'

On Thu, Dec 7, 2017 at 2:46 AM, Qiao, Richard 
wrote:

> Thanks for sharing the code.
>
> The 1st problem in the first code is the map is allocated in Driver, but
> it’s trying to put data in Executors, then retrieve it in driver to send to
> Kafka.
>
> You are using this map as accumulator’s feature, but it doesn’t work in
> this way.
>
>
>
> The 2nd problem is both codes are trying to collect rdd level data to
> generate a single Json string then send to Kafka, which could cause very
> long json string if your TPS is very high.
>
> If possible you can send smaller json strings at task level, such as:
>
> .foreachRDD(stringLongJavaPairRDD -> {
>
>   stringLongJavaPairRDD.foreachPartition{partition ->{
>
>   Map map = new HashMap<>(); //Defined in a task
>
>   partition.foreach(stringLongTuple2 -> {
>
> map.put(stringLongTuple2._1(), stringLongTuple2._2())
>
>   });
>
>   producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
> // send smaller json in a task
>
> }
>
>   }
>
> });
>
> When you do it, make sure kafka producer (seek kafka sink for it) and
> gson’s environment setup correctly in executors.
>
>
>
> If after this, there is still OOM, let’s discuss further.
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> *From: *kant kodali 
> *Date: *Thursday, December 7, 2017 at 2:30 AM
> *To: *Gerard Maas 
> *Cc: *"Qiao, Richard" , "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: Do I need to do .collect inside forEachRDD
>
>
>
> @Richard I had pasted the two versions of the code below and I still
> couldn't figure out why it wouldn't work without .collect ?  Any help would
> be great
>
>
>
>
>
> *The code below doesn't work and sometime I also run into OutOfMemory
> error.*
>
> jsonMessagesDStream
> .window(*new *Duration(6), *new *Duration(1000))
> .mapToPair(val -> {
>   JsonParser parser = *new *JsonParser();
>   JsonObject jsonObj = parser.parse(val).getAsJsonObject();
>   *if *(jsonObj.has(*"key4"*)) {
> *return new *Tuple2<>(*""*, 0L);
>   }
>   String symbol = jsonObj.get(*"key1"*).getAsString();
>   *long *uuantity = jsonObj.get(*"key2"*).getAsLong();
>   *return new *Tuple2<>(symbol, quantity);
> })
> .reduceByKey((v1, v2) -> v1 + v2)
> .foreachRDD(stringLongJavaPairRDD -> {
> Map map = *new *HashMap<>();
> stringLongJavaPairRDD.foreach(stringLongTuple2 -> {
>
> *System.out.println(stringLongTuple2._1()); // Works I can see 
> values getting printed out*
>
> *System.out.println(stringLongTuple2._2()); // Works I can see 
> values getting printed out*
>
> map.put(stringLongTuple2._1(), stringLongTuple2._2())
>
> });
>
> *System.out.println(gson.toJson(map));* // Prints empty json doc 
> string "{}" always. But why? especially
>
> // when the map is getting filled values as confirmed by the print 
> statements above
>
> producer.send(*new *ProducerRecord<>(*"topicA"*, gson.toJson(map)));
> });
>
> jssc.start();
>
> jssc.awaitTermination();
>
>
>
>   VS
>
> *The below code works but it is slow because .collectAsMap*
>
>
>
> jsonMessagesDStream
> .window(*new *Duration(6), *new *Duration(1000))
> .mapToPair(val -> {
>   JsonParser parser = *new *JsonParser();
>   JsonObject jsonObj = parser.parse(val).getAsJsonObject();
>   *if *(jsonObj.has(*"key4"*)) {
> *return new *Tuple2<>(*""*, 0L);
>   }
>   String symbol = jsonObj.get(*"key1"*).getAsString();
>   *long *uuantity = jsonObj.get(*"key2"*).getAsLong();
>   *return new *Tuple2<>(symbol, quantity);
> })
> .reduceByKey((v1, v2) -> v1 + v2)
> .foreachRDD(stringLongJavaPairRDD -> {
>
> LinkedHashMap map = *new 
> *LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());
>
> producer.send(*new *ProducerRecord<>(*"topicA"*, gson.toJson(map)));
>
> });
>
> jssc.start();
>
> jssc.awaitTermination();
>
>
>
>
>
>
>
> On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas  wrote:
>
> Hi Kant,
>
>
>
> >  but would your answer on .collect() change depending on running the
> spark app in client vs cluster mode?
>
>
>
> No, it should make no difference.
>
>
>
> -kr, Gerard.
>
>
>
> On Tue, Dec 5, 2017 at 11:34 PM, kant kodali  wrote:
>
> @Richard I don't see any error in the executor log but let me run again to
> make sure.
>
>
>
> @Gerard Thanks much!  but would your answer on .collect() change depending
> on running the spark app in client vs cluster mode?
>
>
>
> Thanks!
>
>
>
> On Tue, Dec 5, 2017 at 1:54 PM, Ger

Re: Do I need to do .collect inside forEachRDD

2017-12-07 Thread Qiao, Richard
Thanks for sharing the code.
The 1st problem in the first code is the map is allocated in Driver, but it’s 
trying to put data in Executors, then retrieve it in driver to send to Kafka.
You are using this map as accumulator’s feature, but it doesn’t work in this 
way.

The 2nd problem is both codes are trying to collect rdd level data to generate 
a single Json string then send to Kafka, which could cause very long json 
string if your TPS is very high.
If possible you can send smaller json strings at task level, such as:
.foreachRDD(stringLongJavaPairRDD -> {
  stringLongJavaPairRDD.foreachPartition{partition ->{
  Map map = new HashMap<>(); //Defined in a task
  partition.foreach(stringLongTuple2 -> {
map.put(stringLongTuple2._1(), stringLongTuple2._2())
  });
  producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); // 
send smaller json in a task
}
  }
});
When you do it, make sure kafka producer (seek kafka sink for it) and gson’s 
environment setup correctly in executors.

If after this, there is still OOM, let’s discuss further.

Best Regards
Richard


From: kant kodali 
Date: Thursday, December 7, 2017 at 2:30 AM
To: Gerard Maas 
Cc: "Qiao, Richard" , "user @spark" 

Subject: Re: Do I need to do .collect inside forEachRDD

@Richard I had pasted the two versions of the code below and I still couldn't 
figure out why it wouldn't work without .collect ?  Any help would be great


The code below doesn't work and sometime I also run into OutOfMemory error.

jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {
Map map = new HashMap<>();
stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

System.out.println(stringLongTuple2._1()); // Works I can see 
values getting printed out

System.out.println(stringLongTuple2._2()); // Works I can see 
values getting printed out

map.put(stringLongTuple2._1(), stringLongTuple2._2())

});

System.out.println(gson.toJson(map)); // Prints empty json doc string 
"{}" always. But why? especially

// when the map is getting filled values as confirmed by the print 
statements above

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
});

jssc.start();

jssc.awaitTermination();



  VS

The below code works but it is slow because .collectAsMap



jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {

LinkedHashMap map = new 
LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));

});

jssc.start();

jssc.awaitTermination();





On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas 
mailto:gerard.m...@gmail.com>> wrote:
Hi Kant,

>  but would your answer on .collect() change depending on running the spark 
> app in client vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali 
mailto:kanth...@gmail.com>> wrote:
@Richard I don't see any error in the executor log but let me run again to make 
sure.

@Gerard Thanks much!  but would your answer on .collect() change depending on 
running the spark app in client vs cluster mode?

Thanks!

On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas 
mailto:gerard.m...@gmail.com>> wrote:
The general answer to your initial question is that "it depends". If the 
operation in the rdd.foreach() closure can be parallelized, then you don't need 
to collect first. If it needs some local context (e.g. a socket connection), 
then you need to do rdd.collect first to bring the data locally, which has a 
perf penalty and also is restricted to the memory size to the driver process.

Given the further clarification:
>Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

If it's writing to Kafka, that operation can be done in a distributed form.

You could use this lib: https://github.com/BenFradet/spark-kafka-writer

Or, if you can upgrade to Spark 2.2 version, you c

Re: LDA and evaluating topic number

2017-12-07 Thread Stephen Boesch
I have been testing on the 20 NewsGroups dataset - which the Spark docs
themselves reference.  I can confirm that perplexity increases and
likelihood decreases as topics increase - and am similarly confused by
these results.

2017-09-28 10:50 GMT-07:00 Cody Buntain :

> Hi, all!
>
> Is there an example somewhere on using LDA’s logPerplexity()/logLikelihood()
> functions to evaluate topic counts? The existing MLLib LDA examples show
> calling them, but I can’t find any documentation about how to interpret the
> outputs. Graphing the outputs for logs of perplexity and likelihood aren’t
> consistent with what I expected (perplexity increases and likelihood
> decreases as topics increase, which seem odd to me).
>
> An example of what I’m doing is here: http://www.cs.umd.edu/~
> cbuntain/FindTopicK-pyspark-regex.html
>
> Thanks very much in advance! If I can figure this out, I can post example
> code online, so others can see how this process is done.
>
> -Best regards,
> Cody
> _
> Cody Buntain, PhD
> Postdoc, @UMD_CS
> Intelligence Community Postdoctoral Fellow
> cbunt...@cs.umd.edu
> www.cs.umd.edu/~cbuntain
>
>