Spark Structured Streaming for Twitter Streaming data

2018-01-30 Thread Divya Gehlot
Hi,
I am exploring the spark structured streaming .
When turned to internet to understand about it I could find its more
integrated with Kafka or other streaming tool like Kenesis.
I couldnt find where we can use Spark Streaming API for twitter streaming
data .
Would be grateful ,if any body used it or done some work or can guide me
Pardon me if I had understand it wrongly.

Thanks,
Divya


Re: Issue with Cast in Spark Sql

2018-01-30 Thread naresh Goud
Spark/Hive converting decimal to null value if we specify the precision
more than available precision in file.  Below example give you details. I
am not sure why its converting into Null.
Note: You need to trim string before casting to decimal

Table data with col1 and col2 columns


 val r = sqlContext.sql("select col2  from nd2629.test")
+-+
| col2|
+-+
| 1.00|
|  2.0|
|  123.798|
| 123456.6|
+-+



val r = sqlContext.sql("select CAST(TRIM(col2) as decimal(10,4)) from
nd2629.test")

+---+
|_c0|
+---+
| 1.|
| 2.|
|   123.7980|
|123456.6778|
+---+



 val r = sqlContext.sql("select CAST(TRIM(col2) as decimal(10,5)) from
nd2629.test")
+-+
|  _c0|
+-+
|  1.0|
|  2.0|
|123.79800|
| null|
+-+


you need to specify the precision value as max precision value for column -1

in above case max precision is 5 (123456.*6*) so we should specify
decimal(10,5)


Thank you,
Naresh




On Tue, Jan 30, 2018 at 8:48 PM, Arnav kumar  wrote:

> Hi Experts
>
> I am trying to convert a string with decimal value to decimal in Spark Sql
> and load it into Hive/Sql Server.
>
> In Hive instead of getting converted to decimal all my values are coming
> as null.
>
> In Sql Server instead of getting decimal values are coming without
> precision
>
> Can you please let me know if this is any kind of limitation
>
> Here is my code
>
>
> //select the required columns from actual data frame
> val query ="""select eventId,
> cast(eventData.latitude as Decimal(10,10)) as Latitude,
> cast(eventData.longitude as Decimal(10,10)) as Longitude from event"""
>
> //creating event data frame
> val eventTableDF = sparkSession.sql(query)
> //printing the schema for debugging purpose
> eventTableDF.printSchema()
>
> root
>  |-- eventId: string (nullable = true)
>  |-- Latitude: decimal(10,10) (nullable = true)
>  |-- Longitude: decimal(10,10) (nullable = true)
>
>
>
>  val eventTableDF = sparkSession.sql(query)
>   import sparkSession.implicits._
>   eventTableDF.write.mode(org.apache.spark.sql.SaveMode.
> Append).insertInto(eventTable)
>
>
>
>
>
> With Best Regards
> Arnav Kumar
>
>
>
>


why groupByKey still shuffle if SQL does "Distribute By" on same columns ?

2018-01-30 Thread Dibyendu Bhattacharya
 Hi,

I am trying something like this..

val sesDS:  Dataset[XXX] = hiveContext.sql(select).as[XXX]

The select statement is something like this : "select * from sometable 
DISTRIBUTE by col1, col2, col3"

Then comes groupByKey...

val gpbyDS = sesDS .groupByKey(x => (x.col1, x.col2, x.col3))

As my select is already Distribute the data based on columns which are same
as what I used in groupByKey, Why does groupByKey  still doing the shuffle
? Is this an issue or I am missing something ?

Regards,
Dibyendu


Issue with Cast in Spark Sql

2018-01-30 Thread Arnav kumar
Hi Experts

I am trying to convert a string with decimal value to decimal in Spark Sql
and load it into Hive/Sql Server.

In Hive instead of getting converted to decimal all my values are coming as
null.

In Sql Server instead of getting decimal values are coming without precision

Can you please let me know if this is any kind of limitation

Here is my code


//select the required columns from actual data frame
val query ="""select eventId,
cast(eventData.latitude as Decimal(10,10)) as Latitude,
cast(eventData.longitude as Decimal(10,10)) as Longitude from event"""

//creating event data frame
val eventTableDF = sparkSession.sql(query)
//printing the schema for debugging purpose
eventTableDF.printSchema()

root
 |-- eventId: string (nullable = true)
 |-- Latitude: decimal(10,10) (nullable = true)
 |-- Longitude: decimal(10,10) (nullable = true)



 val eventTableDF = sparkSession.sql(query)
  import sparkSession.implicits._

eventTableDF.write.mode(org.apache.spark.sql.SaveMode.Append).insertInto(eventTable)





With Best Regards
Arnav Kumar


Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-01-30 Thread Bryan Cutler
Hi Michelle,

Your original usage of ParamGridBuilder was not quite right, `addGrid`
expects (some parameter, array of values for that parameter).  If you want
to do a grid search with different regularization values, you would do the
following:

val paramMaps = new ParamGridBuilder().addGrid(logist.regParam, Array(0.1,
0.3)).build()

* don't forget to build the grid after adding values

On Tue, Jan 30, 2018 at 6:55 AM, michelleyang 
wrote:

> I tried to use One vs Rest in spark ml with pipeline and crossValidator for
> multimultinomial in logistic regression.
>
> It came out with empty coefficients. I figured out it was the setting of
> ParamGridBuilder. Can anyone help me understand how does the parameter
> setting affect the crossValidator process?
>
> the orginal code: //output empty coefficients.
>
> val logist=new LogisticRegression
>
> val ova = new OneVsRest().setClassifier(logist)
>
> val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> Array(logist.getRegParam))
>
> New code://output multi classes coefficients
>
> val logist=new LogisticRegression
>
> val ova = new OneVsRest().setClassifier(logist)
>
> val classifier1 = new LogisticRegression().setRegParam(2.0)
>
> val classifier2 = new LogisticRegression().setRegParam(3.0)
>
> val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> Array(classifier1, classifier2))
>
> Please help Thanks.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark job error

2018-01-30 Thread Jacek Laskowski
Hi,

Start with spark.executor.memory 2g. You may also
give spark.yarn.executor.memoryOverhead a try.

See https://spark.apache.org/docs/latest/configuration.html and
https://spark.apache.org/docs/latest/running-on-yarn.html for more in-depth
information.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Tue, Jan 30, 2018 at 5:52 PM, shyla deshpande 
wrote:

> I am running Zeppelin on EMR. with the default settings.  I am getting the
> following error. Restarting the Zeppelin application fixes the problem.
>
> What default settings do I need to override that will help fix this error.
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 71 in stage 231.0 failed 4 times, most recent failure: Lost task 71.3 in
> stage 231.0 Reason: Container killed by YARN for exceeding memory limits.
> 1.4 GB of 1.4 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
> Thanks
>
>


spark job error

2018-01-30 Thread shyla deshpande
I am running Zeppelin on EMR. with the default settings.  I am getting the
following error. Restarting the Zeppelin application fixes the problem.

What default settings do I need to override that will help fix this error.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 71
in stage 231.0 failed 4 times, most recent failure: Lost task 71.3 in stage
231.0 Reason: Container killed by YARN for exceeding memory limits. 1.4 GB
of 1.4 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

Thanks


[Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-01-30 Thread LongVehicle
Hello everyone,

We are running Spark Streaming jobs in Spark 2.1 in cluster mode in YARN. We
have an RDD (3GB) that we periodically (every 30min) refresh by reading from
HDFS. Namely, we create a DataFrame /df / using /sqlContext.read.parquet/,
and then we create /RDD rdd = df.as[T].rdd/. The first unexpected thing is
that although /df /has parallelism of 90 (because that many HDFS files we
read),
/rdd /has parallelism of 18 (executors X cores = 9 x 2 in our setup). In the
final stage, we repartition the /rdd /using the /HashPartitioner /and the
parallelism of 18 (we denote it as /final_rdd/), and cache it using
MEMORY_ONLY_SER for 30 minutes.
We repartition the rdd using the same key as in partitioning the HDFS files
in the first place. Finally, we /leftOuterJoin /DStream of 9 Kafka
partitions (which are of total size of 300MB) and /final_rdd /(3GB).
This DStream is partitioned by the same (join) key.
Our batch interval size is 15 seconds, and we read new data from Kafka in
each batch.
   
We noticed that /final_rdd /is sometimes (non-deterministically) unevenly
scheduled across executors. And sometimes only 1 or 2 executors are
executing all the tasks.
The problem with uneven assignment is that it persists until we reload HDFS
data (for half an hour).
Why is this happening? We are aware that Spark uses locality when assigning
tasks to executors, but we also tried to set
s/park.shuffle.reduceLocality.enabled=false/.
Unfortunately, this did not help, neither for rdd, nor for the final_rdd.

Any ideas how to address the problem?

Many thanks!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



回复: Re: use kafka streams API aggregate ?

2018-01-30 Thread 446463...@qq.com
oh sorry,
I means just use Kafka streams API do the aggregate and not depend on Spark 



446463...@qq.com
 
发件人: 郭鹏飞
发送时间: 2018-01-30 23:06
收件人: 446463...@qq.com
抄送: user
主题: Re: use kafka streams API aggregate ?

hi,
Today I do it too.


check your kafka version, then follow  one of the guides below.


http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html


http://spark.apache.org/docs/latest/streaming-kafka-integration.html







在 2018年1月30日,下午10:48,446463...@qq.com 写道:

Hi
I am new to kafka.
today I use kafka streams API for real timing process data
and I have no idea with this
Can someone help me ?



446463...@qq.com



Re: use kafka streams API aggregate ?

2018-01-30 Thread 郭鹏飞

hi,
Today I do it too.


check your kafka version, then follow  one of the guides below.


http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html


http://spark.apache.org/docs/latest/streaming-kafka-integration.html







> 在 2018年1月30日,下午10:48,446463...@qq.com 写道:
> 
> Hi
> I am new to kafka.
> today I use kafka streams API for real timing process data
> and I have no idea with this
> Can someone help me ?
> 
> 446463...@qq.com 


ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-01-30 Thread michelleyang
I tried to use One vs Rest in spark ml with pipeline and crossValidator for
multimultinomial in logistic regression.

It came out with empty coefficients. I figured out it was the setting of
ParamGridBuilder. Can anyone help me understand how does the parameter
setting affect the crossValidator process?

the orginal code: //output empty coefficients.

val logist=new LogisticRegression

val ova = new OneVsRest().setClassifier(logist)

val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
Array(logist.getRegParam))

New code://output multi classes coefficients

val logist=new LogisticRegression

val ova = new OneVsRest().setClassifier(logist)

val classifier1 = new LogisticRegression().setRegParam(2.0)

val classifier2 = new LogisticRegression().setRegParam(3.0)

val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
Array(classifier1, classifier2))

Please help Thanks.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



use kafka streams API aggregate ?

2018-01-30 Thread 446463...@qq.com
Hi
I am new to kafka.
today I use kafka streams API for real timing process data
and I have no idea with this
Can someone help me ?



446463...@qq.com


Re: mapGroupsWithState in Python

2018-01-30 Thread ayan guha
Any help would be much appreciated :)

On Mon, Jan 29, 2018 at 6:25 PM, ayan guha  wrote:

> Hi
>
> I want to write something in Structured streaming:
>
> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
> attribute
> 2. I am receiving the data through Kinesis
>
> I want to deduplicate records based on last_updated. In batch, it looks
> like:
>
> spark.sql("select * from (Select *, row_number() OVER(Partition by id
> order by last_updated desc) rank  from table1) tmp where rank =1")
>
> But now I would like to do it in Structured Stream. I need to maintain the
> state of id as per the highest last_updated, across the triggers, for a
> certain period (24 hours).
>
> Questions:
>
> 1. Should I use mapGroupsWithState or is there any other (SQL?) solution?
> Can anyone help me to write it?
> 2. Is mapGroupsWithState supported in Python?
>
>  Just to ensure we cover bases, I have already tried using dropDuplicates,
> but it is keeping the 1st record encountered for an Id, not updating the
> state:
>
> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
> header.id').alias('id'),
>   get_json_object(unpackedDF.jsonData,
> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>   unpackedDF.jsonData)
>
> dedupDF = 
> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
> hours')
>
>
> So it is not working. Any help is appreciated.
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


Data Integration with Chinese Social Media Sites

2018-01-30 Thread Sanjay Kulkarni
Hi All,

One of the requirement in our project is to integrate data from Chinese
Social media platforms - WeChat, Weibo, Baidu(Search).

Has anyone done it using Spark; are there any pre-built connectors
available and challenges involved in setting up the same.
I am sure there could be multiple challenges, but would be good if you can
share some of them.

Appreciate, if you can share details.

Thanks!
Sanjay


spark.sql.adaptive.enabled has no effect

2018-01-30 Thread 张万新
Hi there,

  As far as I know, when *spark.sql.adaptive.enabled* is set to true, the
number of post shuffle partitions should change with the map output size.
But in my application there is a stage reading 900GB shuffled files only
with 200 partitions (which is the default number of
*spark.sql.shuffle.partitions*), and I verified that the number of  post
shuffle partitions if always the same as the value of
spark.sql.shuffle.partitions.  Additionally I leave the value of
*spark.sql.adaptive**.shuffle.targetPostShuffleInputSize* by default. Is
there any mistake I've made and what's the correct behavior?

Thanks


[Doubt] GridSearch for Hyperparameter Tuning in Spark

2018-01-30 Thread Aakash Basu
Hi,

Is there any available pyspark ML or MLLib API for Grid Search similar to
GridSearchCV from model_selection of sklearn?

I found this - https://spark.apache.org/docs/2.2.0/ml-tuning.html, but it
has cross-validation and train-validation for hp-tuning and not pure grid
search.

Any help?

Thanks,
Aakash.