How To Save TF-IDF Model In PySpark

2016-01-15 Thread Asim Jalis
Hi,

I am trying to save a TF-IDF model in PySpark. Looks like this is not
supported.

Using `model.save()` causes:

AttributeError: 'IDFModel' object has no attribute 'save'

Using `pickle` causes:

TypeError: can't pickle lock objects

Does anyone have suggestions

Thanks!

Asim

Here is the full repro. Start pyspark shell and then run this code in
it.

```
# Imports
from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import Vectors
from pyspark.mllib.feature import IDF

# Create some data
n = 4
freqs = [
Vectors.sparse(n, (1, 3), (1.0, 2.0)),
Vectors.dense([0.0, 1.0, 2.0, 3.0]),
Vectors.sparse(n, [1], [1.0])]
data = sc.parallelize(freqs)
idf = IDF()
model = idf.fit(data)
tfidf = model.transform(data)

# View
for r in tfidf.collect(): print(r)

# Try to save it
model.save("foo.model")

# Try to save it with Pickle
import pickle
pickle.dump(model, open("model.p", "wb"))
pickle.dumps(model)
```


Re: MLlib: Feature Importances API

2015-12-17 Thread Asim Jalis
Yanbo,

Thanks for the reply.

Is there a JIRA for exposing featureImportances on
org.apache.spark.mllib.tree.RandomForest?, or could you create one? I am
unable to create an issue on JIRA against Spark.

Thanks.

Asim

On Thu, Dec 17, 2015 at 12:07 AM, Yanbo Liang  wrote:

> Hi Asim,
>
> The "featureImportances" is only exposed at ML not MLlib.
> You need to update your code to use RandomForestClassifier of ML to train
> and get one RandomForestClassificationModel. Then you can call
> RandomForestClassificationModel.featureImportances
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala#L237>
> to get the importances of each feature.
>
> For how to use RandomForestClassifier, you can refer this example
> <https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestClassifierExample.scala>
> .
>
> Yanbo
>
> 2015-12-17 13:41 GMT+08:00 Asim Jalis :
>
>> I wanted to use get feature importances related to a Random Forest as
>> described in this JIRA: https://issues.apache.org/jira/browse/SPARK-5133
>>
>> However, I don’t see how to call this. I don't see any methods exposed on
>>
>> org.apache.spark.mllib.tree.RandomForest
>>
>> How can I get featureImportances when I generate a RandomForest model in
>> this code?
>>
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.regression.LabeledPoint
>> import org.apache.spark.mllib.tree.RandomForest
>> import org.apache.spark.mllib.tree.model.RandomForestModel
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.rdd.RDD
>> import util.Random
>>
>> def displayModel(model:RandomForestModel) = {
>>   // Display model.
>>   println("Learned classification tree model:\n" + model.toDebugString)
>> }
>>
>> def saveModel(model:RandomForestModel,path:String) = {
>>   // Save and load model.
>>   model.save(sc, path)
>>   val sameModel = DecisionTreeModel.load(sc, path)
>> }
>>
>> def testModel(model:RandomForestModel,testData:RDD[LabeledPoint]) = {
>>   // Test model.
>>   val labelAndPreds = testData.map { point =>
>> val prediction = model.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val testErr = labelAndPreds.
>> filter(r => r._1 != r._2).count.toDouble / testData.count()
>>   println("Test Error = " + testErr)
>> }
>>
>> def buildModel(trainingData:RDD[LabeledPoint],
>>   numClasses:Int,categoricalFeaturesInfo:Map[Int,Int]) = {
>>   val numTrees = 30
>>   val featureSubsetStrategy = "auto"
>>   val impurity = "gini"
>>   val maxDepth = 4
>>   val maxBins = 32
>>
>>   // Build model.
>>   val model = RandomForest.trainClassifier(
>> trainingData, numClasses, categoricalFeaturesInfo,
>> numTrees, featureSubsetStrategy, impurity, maxDepth,
>> maxBins)
>>
>>   model
>> }
>>
>> // Create plain RDD.
>> val rdd = sc.parallelize(Range(0,1000))
>>
>> // Convert to LabeledPoint RDD.
>> val data = rdd.
>>   map(x => {
>> val label = x % 2
>> val feature1 = x % 5
>> val feature2 = x % 7
>> val features = Seq(feature1,feature2).
>>   map(_.toDouble).
>>   zipWithIndex.
>>   map(_.swap)
>> val vector = Vectors.sparse(features.size, features)
>> val point = new LabeledPoint(label, vector)
>> point })
>>
>> // Split data into training (70%) and test (30%).
>> val splits = data.randomSplit(Array(0.7, 0.3))
>> val (trainingData, testData) = (splits(0), splits(1))
>>
>> // Set up parameters for training.
>> val numClasses = data.map(_.label).distinct.count.toInt
>> val categoricalFeaturesInfo = Map[Int, Int]()
>>
>> val model = buildModel(
>> trainingData,
>> numClasses,
>> categoricalFeaturesInfo)
>> testModel(model,testData)
>>
>>
>


MLlib: Feature Importances API

2015-12-16 Thread Asim Jalis
I wanted to use get feature importances related to a Random Forest as
described in this JIRA: https://issues.apache.org/jira/browse/SPARK-5133

However, I don’t see how to call this. I don't see any methods exposed on

org.apache.spark.mllib.tree.RandomForest

How can I get featureImportances when I generate a RandomForest model in
this code?

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import util.Random

def displayModel(model:RandomForestModel) = {
  // Display model.
  println("Learned classification tree model:\n" + model.toDebugString)
}

def saveModel(model:RandomForestModel,path:String) = {
  // Save and load model.
  model.save(sc, path)
  val sameModel = DecisionTreeModel.load(sc, path)
}

def testModel(model:RandomForestModel,testData:RDD[LabeledPoint]) = {
  // Test model.
  val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
  }
  val testErr = labelAndPreds.
filter(r => r._1 != r._2).count.toDouble / testData.count()
  println("Test Error = " + testErr)
}

def buildModel(trainingData:RDD[LabeledPoint],
  numClasses:Int,categoricalFeaturesInfo:Map[Int,Int]) = {
  val numTrees = 30
  val featureSubsetStrategy = "auto"
  val impurity = "gini"
  val maxDepth = 4
  val maxBins = 32

  // Build model.
  val model = RandomForest.trainClassifier(
trainingData, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth,
maxBins)

  model
}

// Create plain RDD.
val rdd = sc.parallelize(Range(0,1000))

// Convert to LabeledPoint RDD.
val data = rdd.
  map(x => {
val label = x % 2
val feature1 = x % 5
val feature2 = x % 7
val features = Seq(feature1,feature2).
  map(_.toDouble).
  zipWithIndex.
  map(_.swap)
val vector = Vectors.sparse(features.size, features)
val point = new LabeledPoint(label, vector)
point })

// Split data into training (70%) and test (30%).
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Set up parameters for training.
val numClasses = data.map(_.label).distinct.count.toInt
val categoricalFeaturesInfo = Map[Int, Int]()

val model = buildModel(
trainingData,
numClasses,
categoricalFeaturesInfo)
testModel(model,testData)


Python's ReduceByKeyAndWindow DStream Keeps Growing

2015-08-17 Thread Asim Jalis
When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size
of the window keeps growing. I am appending the code that reproduces this
issue. This prints out the count() of the dstream which goes up every batch
by 10 elements.

Is this a bug in the Python version of Scala or is this expected behavior?

Here is the code that reproduces this issue.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pprint import pprint

print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1)
ssc.checkpoint('ckpt')

ds = ssc.textFileStream('input') \
.map(lambda event: (event,1)) \
.reduceByKeyAndWindow(
func=lambda count1,count2: count1+count2,
invFunc=lambda count1,count2: count1-count2,
windowDuration=10,
slideDuration=2)

ds.pprint()
ds.count().pprint()

print 'Starting ssc'
ssc.start()

import itertools
import time
import random

from distutils import dir_util

def batch_write(batch_data, batch_file_path):
with open(batch_file_path,'w') as batch_file:
for element in batch_data:
line = str(element) + "\n"
batch_file.write(line)

def xrange_write(
batch_size = 5,
batch_dir = 'input',
batch_duration = 1):
'''Every batch_duration write a file with batch_size numbers,
forever. Start at 0 and keep incrementing. Intended for testing
Spark Streaming code.'''

dir_util.mkpath('./input')
for i in itertools.count():
min = batch_size * i
max = batch_size * (i + 1)
batch_data = xrange(min,max)
file_path = batch_dir + '/' + str(i)
batch_write(batch_data, file_path)
time.sleep(batch_duration)

print 'Feeding data to app'
xrange_write()

ssc.awaitTermination()


Re: QueueStream Does Not Support Checkpointing

2015-08-14 Thread Asim Jalis
Another fix might be to remove the exception that is thrown when windowing
and other stateful operations are used without checkpointing.

On Fri, Aug 14, 2015 at 5:43 PM, Asim Jalis  wrote:

> I feel the real fix here is to remove the exception from QueueInputDStream
> class by reverting the fix of
> https://issues.apache.org/jira/browse/SPARK-8630
>
> I can write another class that is identical to the QueueInputDStream class
> except it does not throw the exception. But this feels like a convoluted
> solution.
>
> Throwing exceptions to forbid behavior in code is risky because it can
> easily break legitimate uses of a class.
>
> Is there a way to reopen https://issues.apache.org/jira/browse/SPARK-8630.
> I have added a comment to it, but I am not sure if that will have that
> effect.
>
> Thanks.
>
> Asim
>
> On Fri, Aug 14, 2015 at 4:03 PM, Holden Karau 
> wrote:
>
>> I just pushed some code that does this for spark-testing-base (
>> https://github.com/holdenk/spark-testing-base )  (its in master) and
>> will publish an updated artifact with it for tonight.
>>
>> On Fri, Aug 14, 2015 at 3:35 PM, Tathagata Das 
>> wrote:
>>
>>> A hacky workaround is to create a customer InputDStream that creates the
>>> right RDDs based on a function. The TestInputDStream
>>> <https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala#L61>
>>> does something similar for Spark Streaming unit tests.
>>>
>>> TD
>>>
>>> On Fri, Aug 14, 2015 at 1:04 PM, Asim Jalis  wrote:
>>>
>>>> I want to test some Spark Streaming code that is using
>>>> reduceByKeyAndWindow. If I do not enable checkpointing, I get the error:
>>>>
>>>> java.lang.IllegalArgumentException: requirement failed: The checkpoint
>>>>> directory has not been set. Please set it by 
>>>>> StreamingContext.checkpoint().
>>>>
>>>>
>>>> But if I enable checkpointing I get
>>>>
>>>> queueStream doesn't support checkpointing
>>>>
>>>>
>>>> Is there a workaround for this?
>>>>
>>>> My goal is to test that the windowing logic in my code is correct. Is
>>>> there a way to disable these strict checks or a different dstream I can use
>>>> that I can populate programmatically and then use for testing?
>>>>
>>>> Thanks.
>>>>
>>>> Asim
>>>>
>>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>> Linked In: https://www.linkedin.com/in/holdenkarau
>>
>
>


Re: QueueStream Does Not Support Checkpointing

2015-08-14 Thread Asim Jalis
I feel the real fix here is to remove the exception from QueueInputDStream
class by reverting the fix of
https://issues.apache.org/jira/browse/SPARK-8630

I can write another class that is identical to the QueueInputDStream class
except it does not throw the exception. But this feels like a convoluted
solution.

Throwing exceptions to forbid behavior in code is risky because it can
easily break legitimate uses of a class.

Is there a way to reopen https://issues.apache.org/jira/browse/SPARK-8630.
I have added a comment to it, but I am not sure if that will have that
effect.

Thanks.

Asim

On Fri, Aug 14, 2015 at 4:03 PM, Holden Karau  wrote:

> I just pushed some code that does this for spark-testing-base (
> https://github.com/holdenk/spark-testing-base )  (its in master) and will
> publish an updated artifact with it for tonight.
>
> On Fri, Aug 14, 2015 at 3:35 PM, Tathagata Das 
> wrote:
>
>> A hacky workaround is to create a customer InputDStream that creates the
>> right RDDs based on a function. The TestInputDStream
>> <https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala#L61>
>> does something similar for Spark Streaming unit tests.
>>
>> TD
>>
>> On Fri, Aug 14, 2015 at 1:04 PM, Asim Jalis  wrote:
>>
>>> I want to test some Spark Streaming code that is using
>>> reduceByKeyAndWindow. If I do not enable checkpointing, I get the error:
>>>
>>> java.lang.IllegalArgumentException: requirement failed: The checkpoint
>>>> directory has not been set. Please set it by StreamingContext.checkpoint().
>>>
>>>
>>> But if I enable checkpointing I get
>>>
>>> queueStream doesn't support checkpointing
>>>
>>>
>>> Is there a workaround for this?
>>>
>>> My goal is to test that the windowing logic in my code is correct. Is
>>> there a way to disable these strict checks or a different dstream I can use
>>> that I can populate programmatically and then use for testing?
>>>
>>> Thanks.
>>>
>>> Asim
>>>
>>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
> Linked In: https://www.linkedin.com/in/holdenkarau
>


QueueStream Does Not Support Checkpointing

2015-08-14 Thread Asim Jalis
I want to test some Spark Streaming code that is using
reduceByKeyAndWindow. If I do not enable checkpointing, I get the error:

java.lang.IllegalArgumentException: requirement failed: The checkpoint
> directory has not been set. Please set it by StreamingContext.checkpoint().


But if I enable checkpointing I get

queueStream doesn't support checkpointing


Is there a workaround for this?

My goal is to test that the windowing logic in my code is correct. Is there
a way to disable these strict checks or a different dstream I can use that
I can populate programmatically and then use for testing?

Thanks.

Asim


Streaming Checkpointing

2015-01-09 Thread Asim Jalis
In Spark Streaming apps if I enable ssc.checkpoint(dir) does this
checkpoint all RDDs? Or is it just checkpointing windowing and state RDDs?

For example, if in a DStream I am using an iterative algorithm on a
non-state non-window RDD, do I have to checkpoint it explicitly myself, or
can I assume that ssc.checkpoint has taken care of checkpointing it?


Initial State of updateStateByKey

2015-01-08 Thread Asim Jalis
In Spark Streaming, is there a way to initialize the state
of updateStateByKey before it starts processing RDDs? I noticed that there
is an overload of updateStateByKey that takes an initialRDD in the latest
sources (although not in the 1.2.0 release). Is there another way to do
this until this feature is released?


Spark Streaming Checkpointing

2015-01-08 Thread Asim Jalis
Since checkpointing in streaming apps happens every checkpoint duration, in
the event of failure, how is the system able to recover the state changes
that happened after the last checkpoint?


Join RDDs with DStreams

2015-01-08 Thread Asim Jalis
Is there a way to join non-DStream RDDs with DStream RDDs?

Here is the use case. I have a lookup table stored in HDFS that I want to
read as an RDD. Then I want to join it with the RDDs that are coming in
through the DStream. How can I do this?

Thanks.

Asim


Re: disable log4j for spark-shell

2015-01-07 Thread Asim Jalis
Another option is to make a copy of log4j.properties in the current
directory where you start spark-shell from, and modify
"log4j.rootCategory=INFO,
console" to "log4j.rootCategory=ERROR, console". Then start the shell.

On Wed, Jan 7, 2015 at 3:39 AM, Akhil  wrote:

> Edit your conf/log4j.properties file and Change the following line:
>
>log4j.rootCategory=INFO, console
>
> to
>
> log4j.rootCategory=ERROR, console
>
> Another approach would be to :
>
> Fireup spark-shell and type in the following:
>
> import org.apache.log4j.Logger
> import org.apache.log4j.Level
>
> Logger.getLogger("org").setLevel(Level.OFF)
> Logger.getLogger("akka").setLevel(Level.OFF)
>
> You won't see any logs after that.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/disable-log4j-for-spark-shell-tp11278p21010.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: RDD Moving Average

2015-01-06 Thread Asim Jalis
One approach I was considering was to use mapPartitions. It is
straightforward to compute the moving average over a partition, except for
near the end point. Does anyone see how to fix that?

On Tue, Jan 6, 2015 at 7:20 PM, Sean Owen  wrote:

> Interesting, I am not sure the order in which fold() encounters elements
> is guaranteed, although from reading the code, I imagine in practice it is
> first-to-last by partition and then folded first-to-last from those results
> on the driver. I don't know this would lead to a solution though as the
> result here needs to be an RDD, not one value.
>
> On Wed, Jan 7, 2015 at 12:10 AM, Paolo Platter 
> wrote:
>
>>  In my opinion you should use fold pattern. Obviously after an sort by
>> trasformation.
>>
>> Paolo
>>
>> Inviata dal mio Windows Phone
>>  --
>> Da: Asim Jalis 
>> Inviato: ‎06/‎01/‎2015 23:11
>> A: Sean Owen 
>> Cc: user@spark.apache.org
>> Oggetto: Re: RDD Moving Average
>>
>>   One problem with this is that we are creating a lot of iterables
>> containing a lot of repeated data. Is there a way to do this so that we can
>> calculate a moving average incrementally?
>>
>> On Tue, Jan 6, 2015 at 4:44 PM, Sean Owen  wrote:
>>
>>> Yes, if you break it down to...
>>>
>>>  tickerRDD.map(ticker =>
>>>   (ticker.timestamp, ticker)
>>> ).map { case(ts, ticker) =>
>>>   ((ts / 6) * 6, ticker)
>>> }.groupByKey
>>>
>>>  ... as Michael alluded to, then it more naturally extends to the
>>> sliding window, since you can flatMap one Ticker to many (bucket, ticker)
>>> pairs, then group. I think this would implementing 1 minute buckets,
>>> sliding by 10 seconds:
>>>
>>>  tickerRDD.flatMap(ticker =>
>>>   (ticker.timestamp - 6 to ticker.timestamp by 15000).map(ts => (ts,
>>> ticker))
>>> ).map { case(ts, ticker) =>
>>>   ((ts / 6) * 6, ticker)
>>> }.groupByKey
>>>
>>> On Tue, Jan 6, 2015 at 8:47 PM, Asim Jalis  wrote:
>>>
>>>>  I guess I can use a similar groupBy approach. Map each event to all
>>>> the windows that it can belong to. Then do a groupBy, etc. I was wondering
>>>> if there was a more elegant approach.
>>>>
>>>> On Tue, Jan 6, 2015 at 3:45 PM, Asim Jalis  wrote:
>>>>
>>>>>  Except I want it to be a sliding window. So the same record could be
>>>>> in multiple buckets.
>>>>>
>>>>>
>>
>


Re: RDD Moving Average

2015-01-06 Thread Asim Jalis
One problem with this is that we are creating a lot of iterables containing
a lot of repeated data. Is there a way to do this so that we can calculate
a moving average incrementally?

On Tue, Jan 6, 2015 at 4:44 PM, Sean Owen  wrote:

> Yes, if you break it down to...
>
> tickerRDD.map(ticker =>
>   (ticker.timestamp, ticker)
> ).map { case(ts, ticker) =>
>   ((ts / 6) * 6, ticker)
> }.groupByKey
>
> ... as Michael alluded to, then it more naturally extends to the sliding
> window, since you can flatMap one Ticker to many (bucket, ticker) pairs,
> then group. I think this would implementing 1 minute buckets, sliding by 10
> seconds:
>
> tickerRDD.flatMap(ticker =>
>   (ticker.timestamp - 6 to ticker.timestamp by 15000).map(ts => (ts,
> ticker))
> ).map { case(ts, ticker) =>
>   ((ts / 6) * 60000, ticker)
> }.groupByKey
>
> On Tue, Jan 6, 2015 at 8:47 PM, Asim Jalis  wrote:
>
>> I guess I can use a similar groupBy approach. Map each event to all the
>> windows that it can belong to. Then do a groupBy, etc. I was wondering if
>> there was a more elegant approach.
>>
>> On Tue, Jan 6, 2015 at 3:45 PM, Asim Jalis  wrote:
>>
>>> Except I want it to be a sliding window. So the same record could be in
>>> multiple buckets.
>>>
>>>


Re: RDD Moving Average

2015-01-06 Thread Asim Jalis
I guess I can use a similar groupBy approach. Map each event to all the
windows that it can belong to. Then do a groupBy, etc. I was wondering if
there was a more elegant approach.

On Tue, Jan 6, 2015 at 3:45 PM, Asim Jalis  wrote:

> Except I want it to be a sliding window. So the same record could be in
> multiple buckets.
>
> On Tue, Jan 6, 2015 at 3:43 PM, Sean Owen  wrote:
>
>> So you want windows covering the same length of time, some of which will
>> be fuller than others? You could, for example, simply bucket the data by
>> minute to get this kind of effect. If you an RDD[Ticker], where Ticker has
>> a timestamp in ms, you could:
>>
>> tickerRDD.groupBy(ticker => (ticker.timestamp / 6) * 6))
>>
>> ... to get an RDD[(Long,Iterable[Ticker])], where the keys are the moment
>> at the start of each minute, and the values are the Tickers within the
>> following minute. You can try variations on this to bucket in different
>> ways.
>>
>> Just be careful because a minute with a huge number of values might cause
>> you to run out of memory. If you're just doing aggregations of some kind
>> there are more efficient methods than this most generic method, like the
>> aggregate methods.
>>
>> On Tue, Jan 6, 2015 at 8:34 PM, Asim Jalis  wrote:
>>
>>> ​Thanks. Another question. ​I have event data with timestamps. I want to
>>> create a sliding window using timestamps. Some windows will have a lot of
>>> events in them others won’t. Is there a way to get an RDD made of this kind
>>> of a variable length window?
>>>
>>>
>>> On Tue, Jan 6, 2015 at 1:03 PM, Sean Owen  wrote:
>>>
>>>> First you'd need to sort the RDD to give it a meaningful order, but I
>>>> assume you have some kind of timestamp in your data you can sort on.
>>>>
>>>> I think you might be after the sliding() function, a developer API in
>>>> MLlib:
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L43
>>>>
>>>> On Tue, Jan 6, 2015 at 5:25 PM, Asim Jalis  wrote:
>>>>
>>>>> Is there an easy way to do a moving average across a single RDD (in a
>>>>> non-streaming app). Here is the use case. I have an RDD made up of stock
>>>>> prices. I want to calculate a moving average using a window size of N.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Asim
>>>>>
>>>>
>>>>
>>>
>>
>


Re: RDD Moving Average

2015-01-06 Thread Asim Jalis
Except I want it to be a sliding window. So the same record could be in
multiple buckets.

On Tue, Jan 6, 2015 at 3:43 PM, Sean Owen  wrote:

> So you want windows covering the same length of time, some of which will
> be fuller than others? You could, for example, simply bucket the data by
> minute to get this kind of effect. If you an RDD[Ticker], where Ticker has
> a timestamp in ms, you could:
>
> tickerRDD.groupBy(ticker => (ticker.timestamp / 6) * 6))
>
> ... to get an RDD[(Long,Iterable[Ticker])], where the keys are the moment
> at the start of each minute, and the values are the Tickers within the
> following minute. You can try variations on this to bucket in different
> ways.
>
> Just be careful because a minute with a huge number of values might cause
> you to run out of memory. If you're just doing aggregations of some kind
> there are more efficient methods than this most generic method, like the
> aggregate methods.
>
> On Tue, Jan 6, 2015 at 8:34 PM, Asim Jalis  wrote:
>
>> ​Thanks. Another question. ​I have event data with timestamps. I want to
>> create a sliding window using timestamps. Some windows will have a lot of
>> events in them others won’t. Is there a way to get an RDD made of this kind
>> of a variable length window?
>>
>>
>> On Tue, Jan 6, 2015 at 1:03 PM, Sean Owen  wrote:
>>
>>> First you'd need to sort the RDD to give it a meaningful order, but I
>>> assume you have some kind of timestamp in your data you can sort on.
>>>
>>> I think you might be after the sliding() function, a developer API in
>>> MLlib:
>>>
>>>
>>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L43
>>>
>>> On Tue, Jan 6, 2015 at 5:25 PM, Asim Jalis  wrote:
>>>
>>>> Is there an easy way to do a moving average across a single RDD (in a
>>>> non-streaming app). Here is the use case. I have an RDD made up of stock
>>>> prices. I want to calculate a moving average using a window size of N.
>>>>
>>>> Thanks.
>>>>
>>>> Asim
>>>>
>>>
>>>
>>
>


Re: RDD Moving Average

2015-01-06 Thread Asim Jalis
​Thanks. Another question. ​I have event data with timestamps. I want to
create a sliding window using timestamps. Some windows will have a lot of
events in them others won’t. Is there a way to get an RDD made of this kind
of a variable length window?


On Tue, Jan 6, 2015 at 1:03 PM, Sean Owen  wrote:

> First you'd need to sort the RDD to give it a meaningful order, but I
> assume you have some kind of timestamp in your data you can sort on.
>
> I think you might be after the sliding() function, a developer API in
> MLlib:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L43
>
> On Tue, Jan 6, 2015 at 5:25 PM, Asim Jalis  wrote:
>
>> Is there an easy way to do a moving average across a single RDD (in a
>> non-streaming app). Here is the use case. I have an RDD made up of stock
>> prices. I want to calculate a moving average using a window size of N.
>>
>> Thanks.
>>
>> Asim
>>
>
>


RDD Moving Average

2015-01-06 Thread Asim Jalis
Is there an easy way to do a moving average across a single RDD (in a
non-streaming app). Here is the use case. I have an RDD made up of stock
prices. I want to calculate a moving average using a window size of N.

Thanks.

Asim


Spark Streaming Threading Model

2014-12-19 Thread Asim Jalis
Q: In Spark Streaming if your DStream transformation and output action take
longer than the batch duration will the system process the next batch in
another thread? Or will it just wait until the first batch’s RDD is
processed? In other words does it build up a queue of buffered RDDs
awaiting processing or does it just process them?

Asim


Spark Streaming Reusing JDBC Connections

2014-12-05 Thread Asim Jalis
Is there a way I can have a JDBC connection open through a streaming job. I
have a foreach which is running once per batch. However, I don’t want to
open the connection for each batch but would rather have a persistent
connection that I can reuse. How can I do this?

Thanks.

Asim