Re: [Spark Streaming] How to do join two messages in spark streaming(Probabaly messasges are in differnet RDD) ?

2016-12-06 Thread Tathagata Das
This sounds like something you can solve by a stateful operator. check out
mapWithState. If both the message can be keyed with a common key, then you
can define a keyed-state. the state will have a field for the first
message.When you see the first message for a key, fill the first field with
timestamp, etc. Then when the second message of the same key arrives, Spark
Streaming will ensure that it calls your state update function with old
state (i.e. first message filled up) and you can take the time difference.

Check out my blog -
https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html

On Tue, Dec 6, 2016 at 5:50 PM, sancheng  wrote:

> any valuable feedback is appreciated!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Streaming-How-to-do-join-two-
> messages-in-spark-streaming-Probabaly-messasges-are-in-
> differnet--tp28161p28163.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Adding a new nested column into dataframe

2016-12-06 Thread AShaitarov
Hello Spark experts!

I need to add one more nested column to existing ones. For example:

Initial DF schema looks like that:

|-- Id: string (nullable = true)
|-- Type: string (nullable = true)
|-- Uri: string (nullable = true)
|-- attributes: struct (nullable = false)
| |-- CountryGroupID: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- CountryGroupDesc: array (nullable = true)
| | |-- element: string (containsNull = true)

But I want to add one more nested column:

|-- Id: string (nullable = true)
|-- Type: string (nullable = true)
|-- Uri: string (nullable = true)
|-- attributes: struct (nullable = false)
| |-- CountryGroupID: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- CountryGroupDesc: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- Status: array (nullable = true)
| | |-- element: string (containsNull = true)

I am working on Spark 1.6.2. I would be really grateful for any help.

Thank you in advance.


Best regards,

Artem



** IMPORTANT--PLEASE READ  This 
electronic message, including its attachments, is CONFIDENTIAL and may contain 
PROPRIETARY or LEGALLY PRIVILEGED or PROTECTED information and is intended for 
the authorized recipient of the sender. If you are not the intended recipient, 
you are hereby notified that any use, disclosure, copying, or distribution of 
this message or any of the information included in it is unauthorized and 
strictly prohibited. If you have received this message in error, please 
immediately notify the sender by reply e-mail and permanently delete this 
message and its attachments, along with any copies thereof, from all locations 
received (e.g., computer, mobile device, etc.). Thank you. 



Re: Terminate job without killing

2016-12-06 Thread Leonid Blokhin
Hi, Bruno!

You can send a message to the topic MQTT, when finished Job. This can be
done with the help of Mist service https://github.com/Hydrospheredata/mist,
or in a similar way.

Regards,

Leonid


7 дек. 2016 г. 6:03 пользователь "Bruno Faria" 
написал:

I have a python spark job that runs successfully but never ends (releases
the prompt). I got messages like "releasing accumulator" but never the
shutdown message (expected) and the prompt release.


In order to handle this I used sys.exit(0), now it works but the tasks
always appears as KILLED and I can't control or monitor if the job ended
successfully or not.


Basically I have 2 questions

1 - Is sys.exit(0) the best way to end a job or am I missing something
(heard sc.stop() is not a good approach)?

2 - How to make sure the job finished successfully or not (the idea is to
use airflow to monitor that)


Any help is really appreciated.


Thanks


Dynamically applying schema in spark.

2016-12-06 Thread Satwinder Singh
Hi Spark Team,
We working on one use case where we need to parse Avro json schema (json is 
nested schema) using apache spark 1.6 and scala 10.2.
As of now we are able to deflate the json from avro and read it and bring in 
hive table. Our requirement is like avro json can add new fields and we want to 
handle such scenario dynamically. Is there any way where we infer schema before 
hand so that we can manage it dynamically if new fields getting added.

Regards, 
Satwinder

Terminate job without killing

2016-12-06 Thread Bruno Faria
I have a python spark job that runs successfully but never ends (releases the 
prompt). I got messages like "releasing accumulator" but never the shutdown 
message (expected) and the prompt release.


In order to handle this I used sys.exit(0), now it works but the tasks always 
appears as KILLED and I can't control or monitor if the job ended successfully 
or not.


Basically I have 2 questions

1 - Is sys.exit(0) the best way to end a job or am I missing something (heard 
sc.stop() is not a good approach)?

2 - How to make sure the job finished successfully or not (the idea is to use 
airflow to monitor that)


Any help is really appreciated.


Thanks


Re: [Spark Streaming] How to do join two messages in spark streaming(Probabaly messasges are in differnet RDD) ?

2016-12-06 Thread sancheng
any valuable feedback is appreciated!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-How-to-do-join-two-messages-in-spark-streaming-Probabaly-messasges-are-in-differnet--tp28161p28163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



OutOfMemoryError while running job...

2016-12-06 Thread Kevin Burton
I am trying to run a Spark job which reads from ElasticSearch and should
write it's output back to a separate ElasticSearch index. Unfortunately I
keep getting `java.lang.OutOfMemoryError: Java heap space` exceptions. I've
tried running it with: --conf spark.memory.offHeap.enabled=true --conf
spark.memory.offHeap.size=2147483648 <(214)%20748-3648> --conf
spark.executor.memory=4g. That didn't help though.

I use Spark version: 2.0.0, 55 worker nodes, ElasticSearch version: 2.3.3,
Scala version 2.11.8, Java 1.8.0_60.

scala> unique_authors.saveToEs("top_users_2016_11_29_to_2016_12_05/user")
[Stage 1:> (0 + 108) /
2048]16/12/06 03:19:40 WARN TaskSetManager: Lost task 78.0 in stage 1.0
(TID 148, 136.243.58.230): java.lang.OutOfMemoryError: Java heap space
at org.spark_project.guava.collect.Ordering.leastOf(
Ordering.java:657)
at org.apache.spark.util.collection.Utils$.takeOrdered(
Utils.scala:37)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$
anonfun$4.apply(limit.scala:143)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$
anonfun$4.apply(limit.scala:142)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(
MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(
MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Links to logs:
Spark-shell log: https://gist.github.com/lakomiec/
e53f8e3f0a7227f751978f5ad95b6c52
Content of compute-top-unique-users.scala: https://gist.github.com/lakomiec/
23e221131554fc9e726f7d6cdc5b88b5
Exception on worker node: https://gist.github.com/lakomiec/
560ab486eed981fd864086189afb413e


... one additional thing to add.

We tried:

content = content.persist(StorageLevel.MEMORY_AND_DISK)

but that didn't seem to have any impact...

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



credentials are not hiding on a jdbc query

2016-12-06 Thread Cesar
Releted to
https://issues.apache.org/jira/browse/SPARK-12504?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22credentials%20jdbc%22



Is there a way to override the explain command on that class without
updating spark. My spark version is 1.6 and is very hard for me to upgrade
spark. Therefore I am looking more for a hack kind of solution.



Thanks a lot !
-- 
Cesar Flores


[Spark SQL]: Dataset Encoder equivalent for pre 1.6.0 releases?

2016-12-06 Thread Denis Papathanasiou
I have a case class named "Computed", and I'd like to be able to encode
all the Row objects in the DataFrame like this:

def myEncoder (df: DataFrame): Dataset[Computed] =
  df.as(Encoders.bean(classOf[Computed]))

This works just fine with the latest version of spark, but I'm forced
to use version 1.5.1, which has neither Dataset nor Encoders.

The alternative seems to be iterating every Row, doing a .get() or
.getAs() on that data, combined with a set on the associated attribute
in Computed.

Is there any other way?


[no subject]

2016-12-06 Thread ayan guha
Hi

We are generating some big model objects


> hdfs dfs -du -h /myfolder
325  975  /myfolder/__ORCHMETA__
1.7 M5.0 M/myfolder/model
185.3 K  555.9 K  /myfolder/predict

The issue I am facing while loading is

Error in .jcall("com/oracle/obx/df/OBXSerializer", returnSig =
"Ljava/lang/Object;",  :
  java.io.StreamCorruptedException: invalid type code: 00
Calls: orch.load.model ->  -> .jcall -> .jcheck -> .Call
Execution halted


As you can guess, it is from Oracle's R for Hadoop (ORAAH) product. I am
going to raise a ticket with Oracle, but thought to check here as well if
there are any solution available.



-- 
Best Regards,
Ayan Guha


MLlib to Compute boundaries of a rectangle given random points on its Surface

2016-12-06 Thread Pradeep Gaddam
Hello,

Can someone please let me know if it is possible to construct a surface(for 
example:- Rectangle) given random points on its surface using Spark MLlib?

Thanks
Pradeep Gaddam




This message and any attachments may contain confidential information of View, 
Inc. If you are not the intended recipient you are hereby notified that any 
dissemination, copying or distribution of this message, or files associated 
with this message, is strictly prohibited. If you have received this message in 
error, please notify us immediately by replying to the message and delete the 
message from your computer.


Re: Livy with Spark

2016-12-06 Thread Mich Talebzadeh
Thanks Richard.

I saw your question in the above blog:

How does Livy proxy the user? Per task? Do you know how quotas are assigned
to users, like how do you stop one Livy user from using all of the
resources available to the Executors?

My points are:


   1. Still don't understand how quotas are assigned to users. Is that done
   by YARN?
   2. What will happen if more than one Livy is running on the same cluster
   all controlled by the same YARN. how resouces are allocated

cheers


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 5 December 2016 at 14:50, Richard Startin 
wrote:

> There is a great write up on Livy at
> http://henning.kropponline.de/2016/11/06/
>
> On 5 Dec 2016, at 14:34, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> Has there been any experience using Livy with Spark to share multiple
> Spark contexts?
>
> thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: [GraphX] Extreme scheduler delay

2016-12-06 Thread Sean Owen
(For what it is worth, I happened to look into this with Anton earlier and
am also pretty convinced it's related to GraphX rather than the app. It's
somewhat difficult to debug what gets sent in the closure AFAICT.)

On Tue, Dec 6, 2016 at 7:49 PM AntonIpp  wrote:

> Hi everyone,
>
> I have a small Scala test project which uses GraphX and for some reason has
> extreme scheduler delay when executed on the cluster. The problem is not
> related to the cluster configuration, as other GraphX applications run
> without any issue.
> I have attached the source code ( MatrixTest.scala
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n28162/MatrixTest.scala
> >
> ), it creates a sort of a  GraphGenerators.gridGraph
> <
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.util.GraphGenerators$
> >
> (but with diagonal edges too) using data from a matrix inside the Map
> class.
> There are in reality only 4 lines related to GraphX itself: creating a
> VertexRDD, creating an EdgeRDD, creating a Graph and then calling
> graph.edges.count.
> As you can see on the  Spark History Server
> <
> http://cdhdns-mn0.westeurope.cloudapp.azure.com:18088/history/application_1480677653852_0050/jobs/
> >
> , the task has very significant scheduler delay. There is also the
> following
> warning in the logs (I have attached them too:  MatrixTest.log
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n28162/MatrixTest.log
> >
> ) : "WARN scheduler.TaskSetManager: Stage 0 contains a task of very large
> size (2905 KB). The maximum recommended task size is 100 KB."
> This also happens with .aggregateMessages.collect and Pregel. I have tested
> with Spark 1.6 and 2.0, different levels of parallelism, different number
> of
> executors, etc but the scheduler delay is still there and grows more and
> more extreme as the number of vertices and edges grows.
>
> Does anyone have any idea as to what could be the source of the issue?
> Thank you!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Extreme-scheduler-delay-tp28162.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Writing DataFrame filter results to separate files

2016-12-06 Thread Everett Anderson
On Mon, Dec 5, 2016 at 5:33 PM, Michael Armbrust 
wrote:

> 1. In my case, I'd need to first explode my data by ~12x to assign each
>> record to multiple 12-month rolling output windows. I'm not sure Spark SQL
>> would be able to optimize this away, combining it with the output writing
>> to do it incrementally.
>>
>
> You are right, but I wouldn't worry about the RAM use.  If implemented
> properly (or if you just use the builtin window
> 
> function), it should all be pipelined.
>


Very cool! Will give it a go. I'm still on Spark 1.6.x so hadn't seen that
function, either!


>
>
>> 2. Wouldn't each partition -- window in my case -- be shuffled to a
>> single machine and then written together as one output shard? For a large
>> amount of data per window, that seems less than ideal.
>>
>
> Oh sorry, I thought you wanted one file per value.  If you drop the
> repartition then it won't shuffle, but will just write in parallel on
> each machine.
>

Thanks!


Re: get corrupted rows using columnNameOfCorruptRecord

2016-12-06 Thread Michael Armbrust
.where("xxx IS NOT NULL") will give you the rows that couldn't be parsed.

On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein <
yeh...@veracity-group.com> wrote:

> Hi all
>
>
>
> I’m trying to parse json using existing schema and got rows with NULL’s
>
> //get schema
>
> val df_schema = spark.sqlContext.sql("select c1,c2,…cn t1  limit 1")
>
> //read json file
>
> val f = sc.textFile("/tmp/x")
>
> //load json into data frame using schema
>
> var df = spark.sqlContext.read.option("columnNameOfCorruptRecord","
> xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f)
>
>
>
> in documentation it say that you can query the corrupted rows by this
> columns à columnNameOfCorruptRecord
>
> o“columnNameOfCorruptRecord (default is the value specified in
> spark.sql.columnNameOfCorruptRecord): allows renaming the new field
> having malformed string created by PERMISSIVE mode. This overrides
> spark.sql.columnNameOfCorruptRecord.”
>
>
>
> The question is how to fetch those corrupted rows ?
>
>
>
>
>
> Thanks
>
> Yehuda
>
>
>
>
>


Re: Spark Streaming - join streaming and static data

2016-12-06 Thread Cody Koeninger
You do not need recent versions of spark, kafka, or structured
streaming in order to do this.  Normal DStreams are sufficient.

You can parallelize your static data from the database to an RDD, and
there's a join method available on RDDs.  Transforming a single given
timestamp line into multiple lines with modified timestamps can be
done using flatMap.

On Tue, Dec 6, 2016 at 11:11 AM, Burak Yavuz  wrote:
> Hi Daniela,
>
> This is trivial with Structured Streaming. If your Kafka cluster is 0.10.0
> or above, you may use Spark 2.0.2 to create a Streaming DataFrame from
> Kafka, and then also create a DataFrame using the JDBC connection, and you
> may join those. In Spark 2.1, there's support for a function called
> "from_json", which should also help you easily parse your messages incoming
> from Kafka.
>
> Best,
> Burak
>
> On Tue, Dec 6, 2016 at 2:16 AM, Daniela S  wrote:
>>
>> Hi
>>
>> I have some questions regarding Spark Streaming.
>>
>> I receive a stream of JSON messages from Kafka.
>> The messages consist of a timestamp and an ID.
>>
>> timestamp ID
>> 2016-12-06 13:001
>> 2016-12-06 13:405
>> ...
>>
>> In a database I have values for each ID:
>>
>> ID   minute  value
>> 1 0   3
>> 1 1   5
>> 1 2   7
>> 1 3   8
>> 5 0   6
>> 5 1   6
>> 5 2   8
>> 5 3   5
>> 5 4   6
>>
>> So I would like to join each incoming JSON message with the corresponding
>> values. It should look as follows:
>>
>> timestamp ID   minute  value
>> 2016-12-06 13:001 0   3
>> 2016-12-06 13:001 1   5
>> 2016-12-06 13:001 2   7
>> 2016-12-06 13:001 3   8
>> 2016-12-06 13:405 0   6
>> 2016-12-06 13:405 1   6
>> 2016-12-06 13:405 2   8
>> 2016-12-06 13:405 3   5
>> 2016-12-06 13:405 4   6
>> ...
>>
>> Then I would like to add the minute values to the timestamp. I only need
>> the computed timestamp and the values. So the result should look as follows:
>>
>> timestamp   value
>> 2016-12-06 13:00  3
>> 2016-12-06 13:01  5
>> 2016-12-06 13:02  7
>> 2016-12-06 13:03  8
>> 2016-12-06 13:40  6
>> 2016-12-06 13:41  6
>> 2016-12-06 13:42  8
>> 2016-12-06 13:43  5
>> 2016-12-06 13:44  6
>> ...
>>
>> Is this a possible use case for Spark Streaming? I thought I could join
>> the streaming data with the static data but I am not sure how to add the
>> minute values to the timestamp. Is this possible with Spark Streaming?
>>
>> Thank you in advance.
>>
>> Best regards,
>> Daniela
>>
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Monitoring the User Metrics for a long running Spark Job

2016-12-06 Thread Chawla,Sumit
Any pointers on this?

Regards
Sumit Chawla


On Mon, Dec 5, 2016 at 8:30 PM, Chawla,Sumit  wrote:

> An example implementation i found is : https://github.com/groupon/
> spark-metrics
>
> Anyone has any experience using this?  I am more interested in something
> for Pyspark specifically.
>
> The above link pointed to - https://github.com/apache/
> spark/blob/master/conf/metrics.properties.template.  I need to spend some
> time reading it, but any quick pointers will be appreciated.
>
>
>
> Regards
> Sumit Chawla
>
>
> On Mon, Dec 5, 2016 at 8:17 PM, Chawla,Sumit 
> wrote:
>
>> Hi Manish
>>
>> I am specifically looking for something similar to following:
>>
>>  https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 1/apis/common/index.html#accumulators--counters.
>>
>> Flink has this concept of Accumulators, where user can keep its custom
>> counters etc.  While the application is executing these counters are
>> queryable through REST API provided by Flink Monitoring Backend.  This way
>> you don't have to wait for the program to complete.
>>
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Mon, Dec 5, 2016 at 5:53 PM, manish ranjan 
>> wrote:
>>
>>> http://spark.apache.org/docs/latest/monitoring.html
>>>
>>> You can even install tools like  dstat
>>> , iostat
>>> , and iotop
>>> , *collectd*  can provide
>>> fine-grained profiling on individual nodes.
>>>
>>> If you are using Mesos as Resource Manager , mesos exposes metrics as
>>> well for the running job.
>>>
>>> Manish
>>>
>>> ~Manish
>>>
>>>
>>>
>>> On Mon, Dec 5, 2016 at 4:17 PM, Chawla,Sumit 
>>> wrote:
>>>
 Hi All

 I have a long running job which takes hours and hours to process data.
 How can i monitor the operational efficency of this job?  I am interested
 in something like Storm\Flink style User metrics/aggregators, which i can
 monitor while my job is running.  Using these metrics i want to monitor,
 per partition performance in processing items.  As of now, only way for me
 to get these metrics is when the job finishes.

 One possibility is that spark can flush the metrics to external system
 every few seconds, and thus use  an external system to monitor these
 metrics.  However, i wanted to see if the spark supports any such use case
 OOB.


 Regards
 Sumit Chawla


>>>
>>
>


Re: driver in queued state and not started

2016-12-06 Thread Michael Gummelt
Client mode or cluster mode?

On Mon, Dec 5, 2016 at 10:05 PM, Yu Wei  wrote:

> Hi Guys,
>
>
> I tried to run spark on mesos cluster.
>
> However, when I tried to submit jobs via spark-submit. The driver is in
> "Queued state" and not started.
>
>
> Which should I check?
>
>
>
> Thanks,
>
> Jared, (韦煜)
> Software developer
> Interested in open source software, big data, Linux
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


efficient filtering on a dataframe

2016-12-06 Thread Koert Kuipers
i have a dataframe on which i need to run many queries that start with a
filter on a column x.

currently i write the dataframe out to parquet datasource partitioned by
field x, after which i repeatedly read the datasource back in from parquet.
the queries are efficient because the filter gets pushed into the
datasource, which filters out directories, so only a subset of the data
gets read for every query.

how can i achieve the same efficiency without going to datasource and back?
the round trip feels artificial and unnecessary.


Re: Spark Streaming - join streaming and static data

2016-12-06 Thread Burak Yavuz
Hi Daniela,

This is trivial with Structured Streaming. If your Kafka cluster is 0.10.0
or above, you may use Spark 2.0.2 to create a Streaming DataFrame from
Kafka, and then also create a DataFrame using the JDBC connection, and you
may join those. In Spark 2.1, there's support for a function called
"from_json", which should also help you easily parse your messages incoming
from Kafka.

Best,
Burak

On Tue, Dec 6, 2016 at 2:16 AM, Daniela S  wrote:

> Hi
>
> I have some questions regarding Spark Streaming.
>
> I receive a stream of JSON messages from Kafka.
> The messages consist of a timestamp and an ID.
>
> timestamp ID
> 2016-12-06 13:001
> 2016-12-06 13:405
> ...
>
> In a database I have values for each ID:
>
> ID   minute  value
> 1 0   3
> 1 1   5
> 1 2   7
> 1 3   8
> 5 0   6
> 5 1   6
> 5 2   8
> 5 3   5
> 5 4   6
>
> So I would like to join each incoming JSON message with the corresponding
> values. It should look as follows:
>
> timestamp ID   minute  value
> 2016-12-06 13:001 0   3
> 2016-12-06 13:001 1   5
> 2016-12-06 13:001 2   7
> 2016-12-06 13:001 3   8
> 2016-12-06 13:405 0   6
> 2016-12-06 13:405 1   6
> 2016-12-06 13:405 2   8
> 2016-12-06 13:405 3   5
> 2016-12-06 13:405 4   6
> ...
>
> Then I would like to add the minute values to the timestamp. I only need
> the computed timestamp and the values. So the result should look as follows:
>
> timestamp   value
> 2016-12-06 13:00  3
> 2016-12-06 13:01  5
> 2016-12-06 13:02  7
> 2016-12-06 13:03  8
> 2016-12-06 13:40  6
> 2016-12-06 13:41  6
> 2016-12-06 13:42  8
> 2016-12-06 13:43  5
> 2016-12-06 13:44  6
> ...
>
> Is this a possible use case for Spark Streaming? I thought I could join
> the streaming data with the static data but I am not sure how to add the
> minute values to the timestamp. Is this possible with Spark Streaming?
>
> Thank you in advance.
>
> Best regards,
> Daniela
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


get corrupted rows using columnNameOfCorruptRecord

2016-12-06 Thread Yehuda Finkelstein
Hi all



I’m trying to parse json using existing schema and got rows with NULL’s

//get schema

val df_schema = spark.sqlContext.sql("select c1,c2,…cn t1  limit 1")

//read json file

val f = sc.textFile("/tmp/x")

//load json into data frame using schema

var df =
spark.sqlContext.read.option("columnNameOfCorruptRecord","xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f)



in documentation it say that you can query the corrupted rows by this
columns à columnNameOfCorruptRecord

o“columnNameOfCorruptRecord (default is the value specified in
spark.sql.columnNameOfCorruptRecord): allows renaming the new field having
malformed string created by PERMISSIVE mode. This overrides
spark.sql.columnNameOfCorruptRecord.”



The question is how to fetch those corrupted rows ?





Thanks

Yehuda


Re: How to compute the recall and F1-score in Linear Regression based model

2016-12-06 Thread Sean Owen
There is no such thing as multiclass regression. These metrics are for
classification problems and don't have meaning for regression.

On Tue, Dec 6, 2016 at 7:55 PM Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi Sean,
>
> According to Spark documentation, precision, recall, F1, true positive
> rate, false positive rate etc. can be calculated using the MultiMetrics
> evaluator for the multiclass classifiers also. For example in *Random
> Forest *based classifier or regressor:
>
> // Get evaluation metrics.
> MulticlassMetrics metrics = new
> MulticlassMetrics(predictionAndLabels.rdd());
> //System.out.println(metrics.confusionMatrix());
>// System.out.println(metrics.confusionMatrix());
> double precision = metrics.precision(metrics.labels()[0]);
> double recall = metrics.recall(metrics.labels()[0]);
> double f_measure = metrics.fMeasure();
> double query_label = 2001; // it's a label or class for prediction
> double TP = metrics.truePositiveRate(query_label);
> double FP = metrics.falsePositiveRate(query_label);
> double WTP = metrics.weightedTruePositiveRate();
> double WFP =  metrics.weightedFalsePositiveRate();
>
> Where the related performance measure statistics is calculated and stored
> in '*predictionAndLabels*' RDD as follows:
> JavaRDD> predictionAndLabels = testData.map(
> new Function>() {
>   public Tuple2 call(LabeledPoint p) {
> Double prediction = model.predict(p.features());
> return new Tuple2(prediction, p.label());
>   }
> }
>   );
> And *'model'* is a Random Forest model instance trained with multiclass
> regression or classification dataset.
>
> The current implementation of Logistic Regression supports only the binary
> classification. But, Linear Regression supports/works on the dataset having
> multiclass.
>
> I was wondering if it's possible to compute the similar metrics using the
> Linear Regression based model for multiclass or binary class dataset.
>
>
>
> Regards,
> _
> *Md. Rezaul Karim* BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>
> On 6 December 2016 at 11:37, Sean Owen  wrote:
>
> Precision, recall and F1 are metrics for binary classifiers, not
> regression models. Can you clarify what you intend to do?
>
> On Tue, Dec 6, 2016, 19:14 Md. Rezaul Karim <
> rezaul.ka...@insight-centre.org> wrote:
>
> Hi Folks,
>
> I have the following code snippet in Java that can calculate the precision
> in Linear Regressor based model.
>
> Dataset predictions = model.transform(testData);
> long count = 0;
>  for (Row r : predictions.select("features", "label",
> "prediction").collectAsList()) {
>count++;
> }
>   System.out.println("precision: " + (double) (count * 100) /
> predictions.count());
>
> Now, I would like to compute other evaluation metrics like *Recall *and 
> *F1-score
> *etc. How could I do that?
>
>
>
> Regards,
> _
> *Md. Rezaul Karim* BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>
>
>


Re: How to compute the recall and F1-score in Linear Regression based model

2016-12-06 Thread Md. Rezaul Karim
Hi Sean,

According to Spark documentation, precision, recall, F1, true positive
rate, false positive rate etc. can be calculated using the MultiMetrics
evaluator for the multiclass classifiers also. For example in *Random
Forest *based classifier or regressor:

// Get evaluation metrics.
MulticlassMetrics metrics = new
MulticlassMetrics(predictionAndLabels.rdd());
//System.out.println(metrics.confusionMatrix());
   // System.out.println(metrics.confusionMatrix());
double precision = metrics.precision(metrics.labels()[0]);
double recall = metrics.recall(metrics.labels()[0]);
double f_measure = metrics.fMeasure();
double query_label = 2001; // it's a label or class for prediction
double TP = metrics.truePositiveRate(query_label);
double FP = metrics.falsePositiveRate(query_label);
double WTP = metrics.weightedTruePositiveRate();
double WFP =  metrics.weightedFalsePositiveRate();

Where the related performance measure statistics is calculated and stored
in '*predictionAndLabels*' RDD as follows:
JavaRDD> predictionAndLabels = testData.map(
new Function>() {
  public Tuple2 call(LabeledPoint p) {
Double prediction = model.predict(p.features());
return new Tuple2(prediction, p.label());
  }
}
  );
And *'model'* is a Random Forest model instance trained with multiclass
regression or classification dataset.

The current implementation of Logistic Regression supports only the binary
classification. But, Linear Regression supports/works on the dataset having
multiclass.

I was wondering if it's possible to compute the similar metrics using the
Linear Regression based model for multiclass or binary class dataset.



Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 6 December 2016 at 11:37, Sean Owen  wrote:

> Precision, recall and F1 are metrics for binary classifiers, not
> regression models. Can you clarify what you intend to do?
>
> On Tue, Dec 6, 2016, 19:14 Md. Rezaul Karim  org> wrote:
>
>> Hi Folks,
>>
>> I have the following code snippet in Java that can calculate the
>> precision in Linear Regressor based model.
>>
>> Dataset predictions = model.transform(testData);
>> long count = 0;
>>  for (Row r : predictions.select("features", "label",
>> "prediction").collectAsList()) {
>>count++;
>> }
>>   System.out.println("precision: " + (double) (count * 100) /
>> predictions.count());
>>
>> Now, I would like to compute other evaluation metrics like *Recall *and 
>> *F1-score
>> *etc. How could I do that?
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim* BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> 
>>
>


[GraphX] Extreme scheduler delay

2016-12-06 Thread AntonIpp
Hi everyone,

I have a small Scala test project which uses GraphX and for some reason has
extreme scheduler delay when executed on the cluster. The problem is not
related to the cluster configuration, as other GraphX applications run
without any issue.
I have attached the source code ( MatrixTest.scala

 
), it creates a sort of a  GraphGenerators.gridGraph

  
(but with diagonal edges too) using data from a matrix inside the Map class.
There are in reality only 4 lines related to GraphX itself: creating a
VertexRDD, creating an EdgeRDD, creating a Graph and then calling
graph.edges.count. 
As you can see on the  Spark History Server

 
, the task has very significant scheduler delay. There is also the following
warning in the logs (I have attached them too:  MatrixTest.log

 
) : "WARN scheduler.TaskSetManager: Stage 0 contains a task of very large
size (2905 KB). The maximum recommended task size is 100 KB."
This also happens with .aggregateMessages.collect and Pregel. I have tested
with Spark 1.6 and 2.0, different levels of parallelism, different number of
executors, etc but the scheduler delay is still there and grows more and
more extreme as the number of vertices and edges grows.

Does anyone have any idea as to what could be the source of the issue?
Thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Extreme-scheduler-delay-tp28162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to compute the recall and F1-score in Linear Regression based model

2016-12-06 Thread Sean Owen
Precision, recall and F1 are metrics for binary classifiers, not regression
models. Can you clarify what you intend to do?

On Tue, Dec 6, 2016, 19:14 Md. Rezaul Karim 
wrote:

> Hi Folks,
>
> I have the following code snippet in Java that can calculate the precision
> in Linear Regressor based model.
>
> Dataset predictions = model.transform(testData);
> long count = 0;
>  for (Row r : predictions.select("features", "label",
> "prediction").collectAsList()) {
>count++;
> }
>   System.out.println("precision: " + (double) (count * 100) /
> predictions.count());
>
> Now, I would like to compute other evaluation metrics like *Recall *and 
> *F1-score
> *etc. How could I do that?
>
>
>
> Regards,
> _
> *Md. Rezaul Karim* BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>


How to compute the recall and F1-score in Linear Regression based model

2016-12-06 Thread Md. Rezaul Karim
Hi Folks,

I have the following code snippet in Java that can calculate the precision
in Linear Regressor based model.

Dataset predictions = model.transform(testData);
long count = 0;
 for (Row r : predictions.select("features", "label",
"prediction").collectAsList()) {
   count++;
}
  System.out.println("precision: " + (double) (count * 100) /
predictions.count());

Now, I would like to compute other evaluation metrics like *Recall
*and *F1-score
*etc. How could I do that?



Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html



Spark Streaming - join streaming and static data

2016-12-06 Thread Daniela S
Hi

 

I have some questions regarding Spark Streaming.

 

I receive a stream of JSON messages from Kafka.

The messages consist of a timestamp and an ID.

 

timestamp                 ID

2016-12-06 13:00    1

2016-12-06 13:40    5

...

 

In a database I have values for each ID:

 

ID       minute      value

1         0               3

1         1               5

1         2               7

1         3               8

5         0               6

5         1               6

5         2               8

5         3               5

5         4               6

 

So I would like to join each incoming JSON message with the corresponding values. It should look as follows:

 

timestamp                 ID           minute      value


2016-12-06 13:00    1             0               3

2016-12-06 13:00    1             1               5          

2016-12-06 13:00    1             2               7

2016-12-06 13:00    1             3               8

2016-12-06 13:40    5             0               6

2016-12-06 13:40    5             1               6

2016-12-06 13:40    5             2               8

2016-12-06 13:40    5             3               5

2016-12-06 13:40    5             4               6

...

 

Then I would like to add the minute values to the timestamp. I only need the computed timestamp and the values. So the result should look as follows:

 


timestamp                   value


2016-12-06 13:00      3

2016-12-06 13:01      5          

2016-12-06 13:02      7

2016-12-06 13:03      8


2016-12-06 13:40      6

2016-12-06 13:41      6

2016-12-06 13:42      8

2016-12-06 13:43      5

2016-12-06 13:44      6


...



 

Is this a possible use case for Spark Streaming? I thought I could join the streaming data with the static data but I am not sure how to add the minute values to the timestamp. Is this possible with Spark Streaming?

 

Thank you in advance.

 

Best regards,

Daniela

 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Re: Re: how to add colum to dataframe

2016-12-06 Thread lk_spark
I have know what is the right way to do it:
val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
val df2 = 
df.withColumn("pa_bid",when(isnull($"url"),"".split("#")(0)).otherwise(split(split(col("url"),"_biz=")(1),
 "")(1)))
scala> df2.select("pa_bid","url").show
+++
|  pa_bid| url|
+++
|MjM5MjEyNTk2MA==|http://mp.weixin|
|MzAxODIwMDcwNA==|http://mp.weixin|
|MzIzMjQ4NzQwOA==|http://mp.weixin|
|MzAwOTIxMTcyMQ==|http://mp.weixin|
|MzA3OTAyNzY2OQ==|http://mp.weixin|
|MjM5NDAzMDAwMA==|http://mp.weixin|
|MzAwMjE4MzU0Nw==|http://mp.weixin|
|MzA4NzcyNjI0Mw==|http://mp.weixin|
|MzI5OTE5Nzc5Ng==|http://mp.weixin|


2016-12-06 

lk_spark 



发件人:"lk_spark"
发送时间:2016-12-06 17:44
主题:Re: Re: how to add colum to dataframe
收件人:"Pankaj Wahane","user.spark"
抄送:

thanks for reply. I will search how to use na.fill . and I don't know how to 
get the value of the column and do some operation like substr or split.

2016-12-06 

lk_spark 



发件人:Pankaj Wahane 
发送时间:2016-12-06 17:39
主题:Re: how to add colum to dataframe
收件人:"lk_spark","user.spark"
抄送:

You may want to try using df2.na.fill(…)
 
From: lk_spark 
Date: Tuesday, 6 December 2016 at 3:05 PM
To: "user.spark" 
Subject: how to add colum to dataframe
 
hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
 
Why what I got is null?
 
2016-12-06



lk_spark 

Re: Re: how to add colum to dataframe

2016-12-06 Thread lk_spark
thanks for reply. I will search how to use na.fill . and I don't know how to 
get the value of the column and do some operation like substr or split.

2016-12-06 

lk_spark 



发件人:Pankaj Wahane 
发送时间:2016-12-06 17:39
主题:Re: how to add colum to dataframe
收件人:"lk_spark","user.spark"
抄送:

You may want to try using df2.na.fill(…)
 
From: lk_spark 
Date: Tuesday, 6 December 2016 at 3:05 PM
To: "user.spark" 
Subject: how to add colum to dataframe
 
hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
 
Why what I got is null?
 
2016-12-06



lk_spark 

Re: how to add colum to dataframe

2016-12-06 Thread Pankaj Wahane
You may want to try using df2.na.fill(…)

From: lk_spark 
Date: Tuesday, 6 December 2016 at 3:05 PM
To: "user.spark" 
Subject: how to add colum to dataframe

hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|

Why what I got is null?

2016-12-06

lk_spark


how to add colum to dataframe

2016-12-06 Thread lk_spark
hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|

Why what I got is null?


2016-12-06


lk_spark 

doing streaming efficiently

2016-12-06 Thread Mendelson, Assaf
Hi,
I have a system which does streaming doing analysis over a long period of time. 
For example a sliding window of 24 hours every 15 minutes.
I have a batch process I need to convert to this streaming.
I am wondering how to do so efficiently.

I am currently building the streaming process so I can use DStream or create 
dataframe for each time period manually.
I know that if I have a groupby, spark would cache the groupby and therefore 
only the new time period would be calculated.
My problem, however, is handling window functions.

Consider an example where I have a window function that counts the number of 
failed logins before a successful one in the last 2 hours. How would I convert 
it to streaming so it wouldn't be recalculated every 15 minutes from scratch?

Thanks,
Assaf.