RE: Make HTTP requests from within Spark

2015-06-03 Thread Mohammed Guller
The short answer is yes.

How you do it depends on a number of factors. Assuming you want to build an RDD 
from the responses and then analyze the responses using Spark core (not Spark 
Streaming), here is one simple way to do it:
1) Implement a class or function that connects to a web service and returns a 
list of responses. This code has no dependency on Spark. It will be the same 
whether you are using Spark or not. Obviously, you have to be take into account 
memory and latency requirements.
2) Call sc.parallelize on the list obtained in step 1. 

This is not the most efficient way of doing it, but hopefully gives you an idea.

Mohammed

-Original Message-
From: kasparfischer [mailto:kaspar.fisc...@dreizak.com] 
Sent: Wednesday, June 3, 2015 12:49 AM
To: user@spark.apache.org
Subject: Make HTTP requests from within Spark

Hi everybody,

I'm new to Spark, apologies if my question is very basic. 

I have a need to send millions of requests to a web service and analyse and 
store the responses in an RDD. I can easy express the analysing part using 
Spark's filter/map/etc. primitives but I don't know how to make the requests. 
Is that something I can do from within Spark? Or Spark Streaming?
Or does it conflict with the way Spark works?

I've found a similar question but am not sure whether the answer applies
here:

  
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html

Any clarifications or pointers would be super helpful!

Thanks,
Kaspar 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Make HTTP requests from within Spark

2015-06-03 Thread William Briggs
Hi Kaspar,

This is definitely doable, but in my opinion, it's important to remember
that, at its core, Spark is based around a functional programming paradigm
- you're taking input sets of data and, by applying various
transformations, you end up with a dataset that represents your "answer".
Without knowing more about your use case, and keeping in mind that I'm very
new to Spark, here are a few things I would want to think about if I were
writing this as a non-Streaming Spark application:

   1. What is your starting dataset? Do you have an initial set of
   parameters or a data source that is used to define each of the millions of
   requests? If so, then that should comprise your first RDD and you can
   perform subsequent transformations to prepare your HTTP requests (e.g.,
   start with the information that drives the generation of the requests, and
   use map/flatMap to create an RDD that has the full list of requests you
   want to run).
   2. Are the HTTP requests read-only, and/or idempotent (are you only
   looking up data, or are you performing requests that cause some sort of
   side effect)? Spark operations against RDDs work by defining a lineage
   graph, and transformations will be re-run if a partition in the lineage
   needs to be recalculated for any reason. If your HTTP requests are causing
   side-effects that should not be repeated, then Spark may not be the best
   fit for that portion of the job, and you might want to use something else,
   pipe the results into HDFS, and then analyze those using Spark..
   3. If your web service requests are lookups or are idempotent, then
   we're on the right track. Keep in mind that your web service probably will
   not scale as well as the Spark job - a naive first-pass implementation
   could easily overwhelm many services, particularly if/when partitions need
   to be recalculated. There are a few mechanisms you can use to mitigate this
   - one is to use mapPartitions rather than map when transforming the set of
   requests to the set of results, initialize an HTTP connection for each
   partition, and transform the data that defines the request into your
   desired dataset by invoking the web service. Using mapPartitions allows you
   to limit the number of concurrent HTTP connections to one per partition
   (although this may be too slow if your service is slow... there is
   obviously a bit of analysis, testing and profiling that would need to be
   done on the entire job). Another consideration would be to look at
   persisting or caching the intermediate results after you've successfully
   retrieved your results from the service, to reduce the likelihood of
   hitting the web service more than necessary.
   4. Just realized you might be looking for help invoking an HTTP service
   programmatically from Scala / Spark - if so, you might want to look at the
   spray-client  library.
   5. With millions of web service requests, it's highly likely some will
   fail, for a variety of reasons. Look into using Scala's Try
    or
   Either
    monads
   to encode success / failure, and treat failed requests as first-class
   citizens in your RDD of results (by retrying them, filtering them, logging
   them, etc., based on your specific needs and use case). Make sure you are
   setting reasonable timeouts on your service calls to prevent the jSpark ob
   from getting stuck if the service turns into a black hole.

As I said above, I'm pretty new to Spark, so others may have some better
advice, or even tell you to ignore mine completely (no hard feelings, I
promise - this is all very new to me).

Good luck!

Regards,
Will

On Wed, Jun 3, 2015 at 3:49 AM, kasparfischer 
wrote:

> Hi everybody,
>
> I'm new to Spark, apologies if my question is very basic.
>
> I have a need to send millions of requests to a web service and analyse and
> store the responses in an RDD. I can easy express the analysing part using
> Spark's filter/map/etc. primitives but I don't know how to make the
> requests. Is that something I can do from within Spark? Or Spark Streaming?
> Or does it conflict with the way Spark works?
>
> I've found a similar question but am not sure whether the answer applies
> here:
>
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html
>
> Any clarifications or pointers would be super helpful!
>
> Thanks,
> Kaspar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...

Re: Make HTTP requests from within Spark

2015-06-03 Thread Pat McDonough
Try something like the following.

Create a function to make the HTTP call, e.g.
using org.apache.commons.httpclient.HttpClient as in below.

def getUrlAsString(url: String): String = {
  val client = new org.apache.http.impl.client.DefaultHttpClient()
  val request = new org.apache.http.client.methods.HttpGet(url)
  val response = client.execute(request)
  val handler = new org.apache.http.impl.client.BasicResponseHandler()
  handler.handleResponse(response).trim
}

Then build up your set of urls, and pass the, as a parameter to your HTTP
function. This is not the most basic example, but it includes some logic to
handle paging for a REST API and also control the number of concurrent
threads if there are fewer than number of CPUs.


val (max, batchSize, threads) = (1500, 200, 20)

val calls = sc.parallelize(
  (0 to max by batchSize).map(
page => s"https://some.url/jsonapi?_start=${page}&_limit=${batchSize}";)
  ,threads)

if (debug) {
  def partMapper(index: Int, iter: Iterator[String]) : Iterator[ Map[
String, Any ] ] = {
iter.toList.map(callString => Map("thread" -> index, "call" ->
callString)).iterator
  }
  calls.mapPartitionsWithIndex(partMapper).collect.foreach(println)
}

val callRDD = calls.map(getUrlAsString(_)))

val yourDataFrame = jsonRDD(callRDD)




On Wed, Jun 3, 2015 at 7:25 PM, William Briggs  wrote:

> Hi Kaspar,
>
> This is definitely doable, but in my opinion, it's important to remember
> that, at its core, Spark is based around a functional programming paradigm
> - you're taking input sets of data and, by applying various
> transformations, you end up with a dataset that represents your "answer".
> Without knowing more about your use case, and keeping in mind that I'm very
> new to Spark, here are a few things I would want to think about if I were
> writing this as a non-Streaming Spark application:
>
>1. What is your starting dataset? Do you have an initial set of
>parameters or a data source that is used to define each of the millions of
>requests? If so, then that should comprise your first RDD and you can
>perform subsequent transformations to prepare your HTTP requests (e.g.,
>start with the information that drives the generation of the requests, and
>use map/flatMap to create an RDD that has the full list of requests you
>want to run).
>2. Are the HTTP requests read-only, and/or idempotent (are you only
>looking up data, or are you performing requests that cause some sort of
>side effect)? Spark operations against RDDs work by defining a lineage
>graph, and transformations will be re-run if a partition in the lineage
>needs to be recalculated for any reason. If your HTTP requests are causing
>side-effects that should not be repeated, then Spark may not be the best
>fit for that portion of the job, and you might want to use something else,
>pipe the results into HDFS, and then analyze those using Spark..
>3. If your web service requests are lookups or are idempotent, then
>we're on the right track. Keep in mind that your web service probably will
>not scale as well as the Spark job - a naive first-pass implementation
>could easily overwhelm many services, particularly if/when partitions need
>to be recalculated. There are a few mechanisms you can use to mitigate this
>- one is to use mapPartitions rather than map when transforming the set of
>requests to the set of results, initialize an HTTP connection for each
>partition, and transform the data that defines the request into your
>desired dataset by invoking the web service. Using mapPartitions allows you
>to limit the number of concurrent HTTP connections to one per partition
>(although this may be too slow if your service is slow... there is
>obviously a bit of analysis, testing and profiling that would need to be
>done on the entire job). Another consideration would be to look at
>persisting or caching the intermediate results after you've successfully
>retrieved your results from the service, to reduce the likelihood of
>hitting the web service more than necessary.
>4. Just realized you might be looking for help invoking an HTTP
>service programmatically from Scala / Spark - if so, you might want to look
>at the spray-client 
>library.
>5. With millions of web service requests, it's highly likely some will
>fail, for a variety of reasons. Look into using Scala's Try
> or
>Either
> monads
>to encode success / failure, and treat failed requests as first-class
>citizens in your RDD of results (by retrying them, filtering them, logging
>them, etc., based on your specific needs and use case). Make sure you are
>setting reasonable timeouts on your service calls to preven