Re: Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Appu K
TD,

Thanks a lot for the quick reply :)


Did I understand it right that in the main thread, to wait for the
termination of the context I'll not be able to use
 outStream.awaitTermination()  -  [ since i'll be closing in inside another
thread ]

What would be a good approach to keep the main app long running if I’ve to
restart queries?

Should i just wait for 2.3 where i'll be able to join two structured
streams ( if the release is just a few weeks away )

Appreciate all the help!

thanks
App



On 14 February 2018 at 4:41:52 PM, Tathagata Das (
tathagata.das1...@gmail.com) wrote:

Let me fix my mistake :)
What I suggested in that earlier thread does not work. The streaming query
that joins a streaming dataset with a batch view, does not correctly pick
up when the view is updated. It works only when you restart the query. That
is,
- stop the query
- recreate the dataframes,
- start the query on the new dataframe using the same checkpoint location
as the previous query

Note that you dont need to restart the whole process/cluster/application,
just restart the query in the same process/cluster/application. This should
be very fast (within a few seconds). So, unless you have latency SLAs of 1
second, you can periodically restart the query without restarting the
process.

Apologies for my misdirections in that earlier thread. Hope this helps.

TD

On Wed, Feb 14, 2018 at 2:57 AM, Appu K  wrote:

> More specifically,
>
> Quoting TD from the previous thread
> "Any streaming query that joins a streaming dataframe with the view will
> automatically start using the most updated data as soon as the view is
> updated”
>
> Wondering if I’m doing something wrong in  https://gist.github.com/
> anonymous/90dac8efadca3a69571e619943ddb2f6
>
> My streaming dataframe is not using the updated data, even though the view
> is updated!
>
> Thank you
>
>
> On 14 February 2018 at 2:54:48 PM, Appu K (kut...@gmail.com) wrote:
>
> Hi,
>
> I had followed the instructions from the thread https://mail-archives.
> apache.org/mod_mbox/spark-user/201704.mbox/%3CD1315D33-
> 41cd-4ba3-8b77-0879f3669...@qvantel.com%3E while trying to reload a
> static data frame periodically that gets joined to a structured streaming
> query.
>
> However, the streaming query results does not reflect the data from the
> refreshed static data frame.
>
> Code is here https://gist.github.com/anonymous/
> 90dac8efadca3a69571e619943ddb2f6
>
> I’m using spark 2.2.1 . Any pointers would be highly helpful
>
> Thanks a lot
>
> Appu
>
>


Re: Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Appu K
More specifically,

Quoting TD from the previous thread
"Any streaming query that joins a streaming dataframe with the view will
automatically start using the most updated data as soon as the view is
updated”

Wondering if I’m doing something wrong in
https://gist.github.com/anonymous/90dac8efadca3a69571e619943ddb2f6

My streaming dataframe is not using the updated data, even though the view
is updated!

Thank you


On 14 February 2018 at 2:54:48 PM, Appu K (kut...@gmail.com) wrote:

Hi,

I had followed the instructions from the thread
https://mail-archives.apache.org/mod_mbox/spark-user/201704.mbox/%3cd1315d33-41cd-4ba3-8b77-0879f3669...@qvantel.com%3E
while
trying to reload a static data frame periodically that gets joined to a
structured streaming query.

However, the streaming query results does not reflect the data from the
refreshed static data frame.

Code is here
https://gist.github.com/anonymous/90dac8efadca3a69571e619943ddb2f6

I’m using spark 2.2.1 . Any pointers would be highly helpful

Thanks a lot

Appu


Spark structured streaming: periodically refresh static data frame

2018-02-14 Thread Appu K
Hi,

I had followed the instructions from the thread
https://mail-archives.apache.org/mod_mbox/spark-user/201704.mbox/%3cd1315d33-41cd-4ba3-8b77-0879f3669...@qvantel.com%3E
while
trying to reload a static data frame periodically that gets joined to a
structured streaming query.

However, the streaming query results does not reflect the data from the
refreshed static data frame.

Code is here
https://gist.github.com/anonymous/90dac8efadca3a69571e619943ddb2f6

I’m using spark 2.2.1 . Any pointers would be highly helpful

Thanks a lot

Appu


Re: Closing resources in the executor

2017-02-02 Thread Appu K
https://mid.mail-archive.com/search?l=user@spark.apache.org&q=subject:%22Executor+shutdown+hook+and+initialization%22&o=newest&f=1

I see this thread where it is mentioned that per-partition resource
management is recommended over global state(within an executor)
What would be the way to achieve this in data-frames

Is shutdown hook the only solution right now ?

thanks
sajith


On 2 February 2017 at 11:58:27 AM, Appu K (kut...@gmail.com) wrote:



What would be the recommended way to close resources opened or shared by
executors?

A few use cases

#1) Let's say the enrichment process needs to convert ip / lat+long to
city/country. To achieve this, executors could open a file in the hdfs and
build a map or use a memory mapped file  - the implementation could be a
transient lazy val singleton or something similar .  Now, the udf defined
would perform lookups on these data structures and return geo data.

#2) Let's say there is a need to do a lookup on a KV store like redis from
the executor. Each executor would create a connection pool and provide
connections for tasks running in them to perform lookups.

In scenarios, like this when the executor is closed, what would be the best
way to close the open resources ( streams etc)


Any pointers to places where i could read up a bit more about the best
practices around it would be highly appreciated!

thanks
appu


Closing resources in the executor

2017-02-01 Thread Appu K
What would be the recommended way to close resources opened or shared by
executors?

A few use cases

#1) Let's say the enrichment process needs to convert ip / lat+long to
city/country. To achieve this, executors could open a file in the hdfs and
build a map or use a memory mapped file  - the implementation could be a
transient lazy val singleton or something similar .  Now, the udf defined
would perform lookups on these data structures and return geo data.

#2) Let's say there is a need to do a lookup on a KV store like redis from
the executor. Each executor would create a connection pool and provide
connections for tasks running in them to perform lookups.

In scenarios, like this when the executor is closed, what would be the best
way to close the open resources ( streams etc)


Any pointers to places where i could read up a bit more about the best
practices around it would be highly appreciated!

thanks
appu


log4j2 support in Spark

2017-01-15 Thread Appu K
Wondering whether it’ll be possible to do structured logging in Spark.

Adding "org.apache.logging.log4j" % "log4j-slf4j-impl" % “2.6.2” makes it
to complain about multiple bindings for slf4j

cheers
Appu


Tuning spark.executor.cores

2017-01-09 Thread Appu K
Are there use-cases for which it is advisable to give a value greater than
the actual number of cores to spark.executor.cores ?


Re: Unable to explain the job kicked off for spark.read.csv

2017-01-09 Thread Appu K
That explains it.  Appreciate the help Hyukjin!

Thank you

On 9 January 2017 at 1:08:02 PM, Hyukjin Kwon (gurwls...@gmail.com) wrote:

Hi Appu,


I believe that textFile and filter came from...

https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala#L59-L61


It needs to read a first line even if using the header is disabled and
schema inference is disabled because we need anyway need a default string
schema

which having the number of fields same with the first row, "_c#" where # is
its position of fields if the schema is not specified manually.

I believe that another job would happen if the schema is explicitly given


I hope this is helpful


Thanks.

2017-01-09 0:11 GMT+09:00 Appu K :

> I was trying to create a base-data-frame in an EMR cluster from a csv file
> using
>
> val baseDF = spark.read.csv("s3://l4b-d4t4/wikipedia/pageviews-by-second-
> tsv”)
>
> Omitted the options to infer the schema and specify the header, just to
> understand what happens behind the screen.
>
>
> The Spark UI shows that this kicked off a job with one stage.The stage
> shows that a filter was applied
>
> Got curious a little bit about this. Is there any place where i could
> better understand why a filter was applied here and why there was an action
> in this case
>
>
> thanks
>
>


2D7ABF39-B592-4AF6-BB8B-C9A5E0AE7508
Description: Binary data


Re: Storage history in web UI

2017-01-08 Thread Appu K
@jacek - thanks a lot for the book

@joe - looks like the rest api also exposes a few things like
/applications/[app-id]/storage/rdd
/applications/[app-id]/storage/rdd/[rdd-id]
that might perhaps be of interest to you ?
http://spark.apache.org/docs/latest/monitoring.html





On 9 January 2017 at 12:07:34 AM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,

A possible workaround...Use SparkListener and save the results to a custom
sink.

After all web UI is a mere bag of SparkListeners + excellent
visualizations.

Jacek

On 3 Jan 2017 4:14 p.m., "Joseph Naegele" 
wrote:

Hi all,

Is there any way to observe Storage history in Spark, i.e. which RDDs were
cached and where, etc. after an application completes? It appears the
Storage tab in the History Server UI is useless.

Thanks
---
Joe Naegele
Grier Forensics



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Spark UI - Puzzling “Input Size / Records” in Stage Details

2017-01-08 Thread Appu K
Was trying something basic to understand tasks stages and shuffles a bit
better in Spark. The dataset is 256 MB

Tried this in zeppelin

val tmpDF = spark.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv("s3://l4b-d4t4/wikipedia/pageviews-by-second-tsv")
tmpDF.count

This kicked off 4 jobs -

   - 3 jobs for the first statement and
   - 1 job with 2 stages for tmpDF.count

The last stage of the job that corresponded to the count statement has some
puzzling data that i'm unable to explain.

   1.

   Stage details section says "Input Size / Records: 186.6 MB / 720 "
   and Aggregated Metrics by Executor says "Input Size / Records " to be
   "186.6 MB / 5371292" - Stage Details UI
   
   2.

   In the tasks list, one particular server
   ip-x-x-x-60.eu-west-1.compute.internal has 4 tasks with "0.0 B / 457130" as
   the value for "Input Size / Records " - Task Details UI
   

I initially thought this is some local disk cache or something that has to
do with EMRFS. But however, once I cached the dataframe and took the count
again, it showed up "16.8 MB / 46" for all 16 tasks corresponding to the 16
partitions.

Any links/pointers to understand this a bit better would be highly helpful


Unable to explain the job kicked off for spark.read.csv

2017-01-08 Thread Appu K
I was trying to create a base-data-frame in an EMR cluster from a csv file
using

val baseDF =
spark.read.csv("s3://l4b-d4t4/wikipedia/pageviews-by-second-tsv”)

Omitted the options to infer the schema and specify the header, just to
understand what happens behind the screen.


The Spark UI shows that this kicked off a job with one stage.The stage
shows that a filter was applied

Got curious a little bit about this. Is there any place where i could
better understand why a filter was applied here and why there was an action
in this case


thanks


groupByKey vs reduceByKey

2016-12-09 Thread Appu K
Hi,

Read somewhere that

groupByKey() in RDD disables map-side aggregation as the aggregation
function (appending to a list) does not save any space.


However from my understanding, using something like reduceByKey or
 (CombineByKey + a combiner function,) we could reduce the data shuffled
around.

Wondering why map-side aggregation is disabled for groupByKey() and why it
wouldn’t save space at the executor where data is received after the
shuffle.


cheers
Appu


Re: Managed memory leak : spark-2.0.2

2016-12-08 Thread Appu K
Hi,

I didn’t hit any oom issues.  thanks for the pointer.  i guess it’ll be
safe to ignore since TaskMemoryManager automatically releases

just wondering what would have been the cause in this case - couldn’t see
any task failures in the log

but some reference to ExternalAppendOnlyMap acquiring mem which it perhaps
didn’t release
———
2016-12-08 16:12:35,100 [Executor task launch worker-0]
(TaskMemoryManager.java:185) DEBUG Task 1 acquired 5.0 MB for
org.apache.spark.util.collection.ExternalAppendOnlyMap@28462783
2016-12-08 16:12:35,156 [Executor task launch worker-0]
(TaskMemoryManager.java:185) DEBUG Task 1 acquired 10.0 MB for
org.apache.spark.util.collection.ExternalAppendOnlyMap@28462783
2016-12-08 16:12:35,265 [Executor task launch worker-0]
(TaskMemoryManager.java:185) DEBUG Task 1 acquired 31.2 MB for
org.apache.spark.util.collection.ExternalAppendOnlyMap@28462783
2016-12-08 16:12:35,485 [Executor task launch worker-0]
(TaskMemoryManager.java:381) WARN leak 46.2 MB memory from
org.apache.spark.util.collection.ExternalAppendOnlyMap@28462783
———

thanks again

cheers
appu


On 8 December 2016 at 8:53:29 PM, Takeshi Yamamuro (linguin@gmail.com)
wrote:

Hi,

Did you hit some troubles from the memory leak?
I think we can ignore the message in most cases because TaskMemoryManager
automatically releases the memory. In fact, spark degraded the message
in SPARK-18557.
https://issues.apache.org/jira/browse/SPARK-18557

// maropu

On Thu, Dec 8, 2016 at 8:10 PM, Appu K  wrote:

> Hello,
>
> I’ve just ran into an issue where the job is giving me "Managed memory
> leak" with spark version 2.0.2
>
> —
> 2016-12-08 16:31:25,231 [Executor task launch worker-0]
> (TaskMemoryManager.java:381) WARN leak 46.2 MB memory from
> org.apache.spark.util.collection.ExternalAppendOnlyMap@22719fb8
> 2016-12-08 16:31:25,232 [Executor task launch worker-0] (Logging.scala:66)
> WARN Managed memory leak detected; size = 48442112 bytes, TID = 1
> —
>
>
> The program itself is very basic and looks like take() is causing the
> issue
>
> Program: https://gist.github.com/kutt4n/87cfcd4e794b1865b6f880412dd80bbf
> Debug Log: https://gist.github.com/kutt4n/ba3cf812dced34ceadc588856edc
>
>
> TaskMemoryManager.java:381 says that it's normal to see leaked memory if
> one of the tasks failed.  In this case from the debug log - it is not quite
> apparent which task failed and the reason for failure.
>
> When the TSV file itself is small the issue doesn’t exist. In this
> particular case, the file is a 21MB clickstream data from wikipedia
> available at https://ndownloader.figshare.com/files/5036392
>
> Where could i read up more about managed memory leak. Any pointers on what
> might be the issue would be highly helpful
>
> thanks
> appu
>
>
>
>


--
---
Takeshi Yamamuro


Managed memory leak : spark-2.0.2

2016-12-08 Thread Appu K
Hello,

I’ve just ran into an issue where the job is giving me "Managed memory
leak" with spark version 2.0.2

—
2016-12-08 16:31:25,231 [Executor task launch worker-0]
(TaskMemoryManager.java:381) WARN leak 46.2 MB memory from
org.apache.spark.util.collection.ExternalAppendOnlyMap@22719fb8
2016-12-08 16:31:25,232 [Executor task launch worker-0] (Logging.scala:66)
WARN Managed memory leak detected; size = 48442112 bytes, TID = 1
—


The program itself is very basic and looks like take() is causing the issue

Program: https://gist.github.com/kutt4n/87cfcd4e794b1865b6f880412dd80bbf
Debug Log: https://gist.github.com/kutt4n/ba3cf812dced34ceadc588856edc


TaskMemoryManager.java:381 says that it's normal to see leaked memory if
one of the tasks failed.  In this case from the debug log - it is not quite
apparent which task failed and the reason for failure.

When the TSV file itself is small the issue doesn’t exist. In this
particular case, the file is a 21MB clickstream data from wikipedia
available at https://ndownloader.figshare.com/files/5036392

Where could i read up more about managed memory leak. Any pointers on what
might be the issue would be highly helpful

thanks
appu