Dataframe size using RDDStorageInfo objects

2018-03-16 Thread Bahubali Jain
Hi,
I am trying to figure out a way to find the size of *persisted *dataframes
using the *sparkContext.getRDDStorageInfo() *
RDDStorageInfo object has information related to the number of bytes stored
in memory and on disk.

For eg:
I have 3 dataframes which i have cached.
df1.cache()
df2.cache()
df3.cache()

The call to sparkContext.getRDDStorageInfo()  would return array of 3
objects of RDDStorageInfo since I have cached 3 data frames.
But there is no way to map between df1,df2,df3 to each of the
RDDStorageInfo objects.

Is there any better way out?

Thanks,
Baahu


Compression during shuffle writes

2017-11-09 Thread Bahubali Jain
Hi,
I have compressed data of size 500GB .I am repartitioning this data since
the underlying data is very skewed and is causing a lot of issues for the
downstream jobs.
During repartioning the *shuffles writes* are not getting compressed due to
this I am running into disk space issues.Below is the screen shot which
clearly depicts the issue(Input,shuffle write columns)
I have proactively set below parameters to true, but still it doesnt
compress the intermediate shuffled data

spark.shuffle.compress
spark.shuffle.spill.compress

[image: Inline image 1]

I am using Spark 1.5 (for various unavoidable reasons!!)
Any suggestions would be greatly appreciated.

Thanks,
Baahu


Re: Dataset : Issue with Save

2017-03-16 Thread Bahubali Jain
I am using SPARK 2.0 . There are comments in the ticket since Oct-2016
which clearly mention that issue still persists even in 2.0.
I agree 1G is very small today's world, and I have already resolved by
increasing the
*spark.driver.maxResultSize.*
I was more intrigued as to why is the data being sent to driver during
save(similat to collect() action ), are there any plans to fix this
behavior/issue ?

Thanks,
Baahu

On Fri, Mar 17, 2017 at 8:17 AM, Yong Zhang  wrote:

> Did you read the JIRA ticket? Are you confirming that it is fixed in Spark
> 2.0, or you complain that it still exists in Spark 2.0?
>
>
> First, you didn't tell us what version of your Spark you are using. The
> JIRA clearly said that it is a bug in Spark 1.x, and should be fixed in
> Spark 2.0. So help yourself and the community, to confirm if this is the
> case.
>
>
> If you are looking for workaround, the JIRA ticket clearly show you how to
> increase your driver heap. 1G in today's world really is kind of small.
>
>
> Yong
>
>
> --
> *From:* Bahubali Jain 
> *Sent:* Thursday, March 16, 2017 10:34 PM
> *To:* Yong Zhang
> *Cc:* user@spark.apache.org
> *Subject:* Re: Dataset : Issue with Save
>
> Hi,
> Was this not yet resolved?
> Its a very common requirement to save a dataframe, is there a better way
> to save a dataframe by avoiding data being sent to driver?.
>
>
> * "Total size of serialized results of 3722 tasks (1024.0 MB) is bigger
> than spark.driver.maxResultSize (1024.0 MB) " *
> Thanks,
> Baahu
>
> On Fri, Mar 17, 2017 at 1:19 AM, Yong Zhang  wrote:
>
>> You can take a look of https://issues.apache.org/jira/browse/SPARK-12837
>>
>>
>> Yong
>> Spark driver requires large memory space for serialized ...
>> <https://issues.apache.org/jira/browse/SPARK-12837>
>> issues.apache.org
>> Executing a sql statement with a large number of partitions requires a
>> high memory space for the driver even there are no requests to collect data
>> back to the driver.
>>
>>
>>
>> --
>> *From:* Bahubali Jain 
>> *Sent:* Thursday, March 16, 2017 1:39 PM
>> *To:* user@spark.apache.org
>> *Subject:* Dataset : Issue with Save
>>
>> Hi,
>> While saving a dataset using   *
>> mydataset.write().csv("outputlocation")  * I am running
>> into an exception
>>
>>
>>
>> * "Total size of serialized results of 3722 tasks (1024.0 MB) is bigger
>> than spark.driver.maxResultSize (1024.0 MB)" *
>> Does it mean that for saving a dataset whole of the dataset contents are
>> being sent to driver ,similar to collect()  action?
>>
>> Thanks,
>> Baahu
>>
>
>
>
> --
> Twitter:http://twitter.com/Baahu
>
>


-- 
Twitter:http://twitter.com/Baahu


Re: Dataset : Issue with Save

2017-03-16 Thread Bahubali Jain
Hi,
Was this not yet resolved?
Its a very common requirement to save a dataframe, is there a better way to
save a dataframe by avoiding data being sent to driver?.


*"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB) "*
Thanks,
Baahu

On Fri, Mar 17, 2017 at 1:19 AM, Yong Zhang  wrote:

> You can take a look of https://issues.apache.org/jira/browse/SPARK-12837
>
>
> Yong
> Spark driver requires large memory space for serialized ...
> <https://issues.apache.org/jira/browse/SPARK-12837>
> issues.apache.org
> Executing a sql statement with a large number of partitions requires a
> high memory space for the driver even there are no requests to collect data
> back to the driver.
>
>
>
> --
> *From:* Bahubali Jain 
> *Sent:* Thursday, March 16, 2017 1:39 PM
> *To:* user@spark.apache.org
> *Subject:* Dataset : Issue with Save
>
> Hi,
> While saving a dataset using   *
> mydataset.write().csv("outputlocation")  * I am running
> into an exception
>
>
>
> * "Total size of serialized results of 3722 tasks (1024.0 MB) is bigger
> than spark.driver.maxResultSize (1024.0 MB)" *
> Does it mean that for saving a dataset whole of the dataset contents are
> being sent to driver ,similar to collect()  action?
>
> Thanks,
> Baahu
>



-- 
Twitter:http://twitter.com/Baahu


Dataset : Issue with Save

2017-03-16 Thread Bahubali Jain
Hi,
While saving a dataset using   *
mydataset.write().csv("outputlocation")  * I am running
into an exception



*"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)"*
Does it mean that for saving a dataset whole of the dataset contents are
being sent to driver ,similar to collect()  action?

Thanks,
Baahu


SPARK ML- Feature Selection Techniques

2016-09-05 Thread Bahubali Jain
Hi,
Do we have any feature selection techniques implementation(wrapper
methods,embedded methods) available in SPARK ML ?

Thanks,
Baahu
-- 
Twitter:http://twitter.com/Baahu


Re: Random Forest Classification

2016-08-30 Thread Bahubali Jain
Hi Bryan,
Thanks for the reply.
I am indexing 5 columns ,then using these indexed columns to generate the
"feature" column thru vector assembler.
Which essentially means that I cannot use *fit()* directly on
"completeDataset" dataframe since it will neither have the "feature" column
and nor the 5 indexed columns.
Of-course there is a dirty way of doing this, but I am wondering if there
some optimized/intelligent approach for this.

Thanks,
Baahu

On Wed, Aug 31, 2016 at 3:30 AM, Bryan Cutler  wrote:

> You need to first fit just the VectorIndexer which returns the model, then
> add the model to the pipeline where it will only transform.
>
> val featureVectorIndexer = new VectorIndexer()
> .setInputCol("feature")
> .setOutputCol("indexedfeature")
> .setMaxCategories(180)
>     .fit(completeDataset)
>
> On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain  wrote:
>
>> Hi,
>> I had run into similar exception " java.util.NoSuchElementException: key
>> not found: " .
>> After further investigation I realized it is happening due to
>> vectorindexer being executed on training dataset and not on entire dataset.
>>
>> In the dataframe I have 5 categories , each of these have to go thru
>> stringindexer and then these are put thru a vector indexer to generate
>> feature vector.
>> What is the right way to do this, so that vector indexer can be run on
>> the entire data and not just on training data?
>>
>> Below is the current approach, as evident  VectorIndexer is being
>> generated based on the training set.
>>
>> Please Note: fit() on Vectorindexer cannot be called on entireset
>> dataframe since it doesn't have the required column(*feature *column is
>> being generated dynamically in pipeline execution)
>> How can the vectorindexer be *fit()* on the entireset?
>>
>>  val col1_indexer = new StringIndexer().setInputCol("c
>> ol1").setOutputCol("indexed_col1")
>> val col2_indexer = new StringIndexer().setInputCol("c
>> ol2").setOutputCol("indexed_col2")
>> val col3_indexer = new StringIndexer().setInputCol("c
>> ol3").setOutputCol("indexed_col3")
>> val col4_indexer = new StringIndexer().setInputCol("c
>> ol4").setOutputCol("indexed_col4")
>> val col5_indexer = new StringIndexer().setInputCol("c
>> ol5").setOutputCol("indexed_col5")
>>
>> val featureArray =  Array("indexed_col1","indexed_
>> col2","indexed_col3","indexed_col4","indexed_col5")
>> val vectorAssembler = new VectorAssembler().setInputCols
>> (featureArray).setOutputCol("*feature*")
>> val featureVectorIndexer = new VectorIndexer()
>> .setInputCol("feature")
>> .setOutputCol("indexedfeature")
>> .setMaxCategories(180)
>>
>> val decisionTree = new DecisionTreeClassifier().setMa
>> xBins(300).setMaxDepth(1).setImpurity("entropy").setLabe
>> lCol("indexed_user_action").setFeaturesCol("indexedfeature
>> ").setPredictionCol("prediction")
>>
>> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
>> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,featureVecto
>> rIndexer,decisionTree))
>> val model = pipeline.*fit(trainingSet)*
>> val output = model.transform(cvSet)
>>
>>
>> Thanks,
>> Baahu
>>
>> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler  wrote:
>>
>>> Hi Rich,
>>>
>>> I looked at the notebook and it seems like you are fitting the
>>> StringIndexer and VectorIndexer to only the training data, and it should
>>> the the entire data set.  So if the training data does not include all of
>>> the labels and an unknown label appears in the test data during evaluation,
>>> then it will not know how to index it.  So your code should be like this,
>>> fit with 'digits' instead of 'training'
>>>
>>> val labelIndexer = new StringIndexer().setInputCol("l
>>> abel").setOutputCol("indexedLabel").fit(digits)
>>> // Automatically identify categorical features, and index them.
>>> // Set maxCategories so features with > 4 distinct values are treated as
>>> continuous.
>>> val featureIndexer = new VectorIndexer().setInputCol("f
>>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4
>>> ).fit(digits)
>>>
>>> Hope that helps!
>>>
&

Re: Random Forest Classification

2016-08-30 Thread Bahubali Jain
Hi,
I had run into similar exception " java.util.NoSuchElementException: key
not found: " .
After further investigation I realized it is happening due to vectorindexer
being executed on training dataset and not on entire dataset.

In the dataframe I have 5 categories , each of these have to go thru
stringindexer and then these are put thru a vector indexer to generate
feature vector.
What is the right way to do this, so that vector indexer can be run on the
entire data and not just on training data?

Below is the current approach, as evident  VectorIndexer is being generated
based on the training set.

Please Note: fit() on Vectorindexer cannot be called on entireset
dataframe since it doesn't have the required column(*feature *column is
being generated dynamically in pipeline execution)
How can the vectorindexer be *fit()* on the entireset?

 val col1_indexer = new
StringIndexer().setInputCol("col1").setOutputCol("indexed_col1")
val col2_indexer = new
StringIndexer().setInputCol("col2").setOutputCol("indexed_col2")
val col3_indexer = new
StringIndexer().setInputCol("col3").setOutputCol("indexed_col3")
val col4_indexer = new
StringIndexer().setInputCol("col4").setOutputCol("indexed_col4")
val col5_indexer = new
StringIndexer().setInputCol("col5").setOutputCol("indexed_col5")

val featureArray =
Array("indexed_col1","indexed_col2","indexed_col3","indexed_col4","indexed_col5")
val vectorAssembler = new
VectorAssembler().setInputCols(featureArray).setOutputCol("*feature*")
val featureVectorIndexer = new VectorIndexer()
.setInputCol("feature")
.setOutputCol("indexedfeature")
.setMaxCategories(180)

val decisionTree = new
DecisionTreeClassifier().setMaxBins(300).setMaxDepth(1).setImpurity("entropy").setLabelCol("indexed_user_action").setFeaturesCol("indexedfeature").setPredictionCol("prediction")


val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer,
col3_indexer,col4_indexer,col5_indexer,
vectorAssembler,featureVectorIndexer,decisionTree))
val model = pipeline.*fit(trainingSet)*
val output = model.transform(cvSet)


Thanks,
Baahu

On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler  wrote:

> Hi Rich,
>
> I looked at the notebook and it seems like you are fitting the
> StringIndexer and VectorIndexer to only the training data, and it should
> the the entire data set.  So if the training data does not include all of
> the labels and an unknown label appears in the test data during evaluation,
> then it will not know how to index it.  So your code should be like this,
> fit with 'digits' instead of 'training'
>
> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("
> indexedLabel").fit(digits)
> // Automatically identify categorical features, and index them.
> // Set maxCategories so features with > 4 distinct values are treated as
> continuous.
> val featureIndexer = new VectorIndexer().setInputCol("
> features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(digits)
>
> Hope that helps!
>
> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro  wrote:
>
>> Hi Bryan.
>>
>> Thanks for your continued help.
>>
>> Here is the code shown in a Jupyter notebook. I figured this was easier
>> that cutting and pasting the code into an email. If you  would like me to
>> send you the code in a different format let, me know. The necessary data is
>> all downloaded within the notebook itself.
>>
>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-
>> 401f-4744-a318-b1b6bcf6f5f8/view?access_token=
>> 2f6df7b1dfcb3c1c2d94a794506bb282729dab8f05118fafe5f11886326e02fc
>>
>> A few additional pieces of information.
>>
>> 1. The training dataset is cached before training the model. If you do
>> not cache the training dataset, the model will not train. The code
>> model.transform(test) fails with a similar error. No other changes besides
>> caching or not caching. Again, with the training dataset cached, the model
>> can be successfully trained as seen in the notebook.
>>
>> 2. I have another version of the notebook where I download the same data
>> in libsvm format rather than csv. That notebook works fine. All the code is
>> essentially the same accounting for the difference in file formats.
>>
>> 3. I tested this same code on another Spark cloud platform and it
>> displays the same symptoms when run there.
>>
>> Thanks.
>> Rich
>>
>>
>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler  wrote:
>>
>>> Are you fitting the VectorIndexer to the entire data set and not just
>>> training or test data?  If you are able to post your code and some data to
>>> reproduce, that would help in troubleshooting.
>>>
>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro  wrote:
>>>
 Thanks for the response, but in my case I reversed the meaning of
 "prediction" and "predictedLabel". It seemed to make more sense to me that
 way, but in retrospect, it probably only causes confusion to anyone else
 looking at this. I reran the code with all the pipeline stage inputs and
 outputs named exa

Large files with wholetextfile()

2016-07-12 Thread Bahubali Jain
Hi,
We have a requirement where in we need to process set of xml files, each of
the xml files contain several records (eg:

 data of record 1..



data of record 2..


Expected output is   

Since we needed file name as well in output ,we chose wholetextfile() . We
had to go against using StreamXmlRecordReader and StreamInputFormat since I
could not find a way to retreive the filename.

These xml files could be pretty big, occasionally they could reach a size
of 1GB.Since contents of each file would be put into a single partition,would
such big files be a issue ?
The AWS cluster(50 Nodes) that we use is fairly strong , with each machine
having memory of around 60GB.

Thanks,
Baahu


DAG related query

2015-08-20 Thread Bahubali Jain
Hi,
How would the DAG look like for the below code

JavaRDD rdd1 = context.textFile();
JavaRDD rdd2 = rdd1.map();
rdd1 =  rdd2.map();

Does this lead to any kind of cycle?

Thanks,
Baahu


JavaRDD and saveAsNewAPIHadoopFile()

2015-06-27 Thread Bahubali Jain
Hi,
Why doesn't JavaRDD has saveAsNewAPIHadoopFile() associated with it.


Thanks,
Baahu

-- 
Twitter:http://twitter.com/Baahu


Multiple dir support : newApiHadoopFile

2015-06-26 Thread Bahubali Jain
Hi,
How do we read files from multiple directories using newApiHadoopFile () ?

Thanks,
Baahu
-- 
Twitter:http://twitter.com/Baahu


Pseudo Spark Streaming ?

2015-04-05 Thread Bahubali Jain
Hi,
I have a requirement in which I plan to use the SPARK Streaming.
I am supposed to calculate the access count to certain webpages.I receive
the webpage access information thru log files.
By Access count I mean "how many times was the page accessed *till now* "
I have the log files for past 2 years and everyday we keep receiving almost
6 GB of access logs(on an hourly basis).
Since we receive these logs on an hourly basis I feel that I should use the
SPARK Streaming.
But the problem is that the access counts have to be cumulative , i.e even
the older access(past 2 years) counts for a webpage should also be
considered for the final value.

How to achieve this thru streaming, since streaming picks only new files.
I don't want to use DB to store the access counts since it would
considerably slow down the processing.

Thanks,
Baahu
-- 
Twitter:http://twitter.com/Baahu


Re: Writing to HDFS from spark Streaming

2015-02-15 Thread Bahubali Jain
I used the latest assembly jar and the below as suggested by Akhil to fix
this problem...
temp.saveAsHadoopFiles("DailyCSV",".txt", String.class, String.class,
*(Class)* TextOutputFormat.class);

Thanks All for the help !

On Wed, Feb 11, 2015 at 1:38 PM, Sean Owen  wrote:

> That kinda dodges the problem by ignoring generic types. But it may be
> simpler than the 'real' solution, which is a bit ugly.
>
> (But first, to double check, are you importing the correct
> TextOutputFormat? there are two versions. You use .mapred. with the
> old API and .mapreduce. with the new API.)
>
> Here's how I've formally casted around it in similar code:
>
> @SuppressWarnings
> Class> outputFormatClass =
> (Class>) (Class) TextOutputFormat.class;
>
> and then pass that as the final argument.
>
> On Wed, Feb 11, 2015 at 6:35 AM, Akhil Das 
> wrote:
> > Did you try :
> >
> > temp.saveAsHadoopFiles("DailyCSV",".txt", String.class,
> String.class,(Class)
> > TextOutputFormat.class);
> >
> > Thanks
> > Best Regards
> >
> > On Wed, Feb 11, 2015 at 9:40 AM, Bahubali Jain 
> wrote:
> >>
> >> Hi,
> >> I am facing issues while writing data from a streaming rdd to hdfs..
> >>
> >> JavaPairDstream temp;
> >> ...
> >> ...
> >> temp.saveAsHadoopFiles("DailyCSV",".txt", String.class,
> >> String.class,TextOutputFormat.class);
> >>
> >>
> >> I see compilation issues as below...
> >> The method saveAsHadoopFiles(String, String, Class, Class, Class >> extends OutputFormat>) in the type JavaPairDStream
> is
> >> not applicable for the arguments (String, String, Class,
> >> Class, Class)
> >>
> >> I see same kind of problem even with saveAsNewAPIHadoopFiles API .
> >>
> >> Thanks,
> >> Baahu
> >
> >
>



-- 
Twitter:http://twitter.com/Baahu


Writing to HDFS from spark Streaming

2015-02-10 Thread Bahubali Jain
Hi,
I am facing issues while writing data from a streaming rdd to hdfs..

JavaPairDstream temp;
...
...
temp.saveAsHadoopFiles("DailyCSV",".txt", String.class,
String.class,TextOutputFormat.class);


I see compilation issues as below...
The method saveAsHadoopFiles(String, String, Class, Class, Class>) in the type JavaPairDStream is
not applicable for the arguments (String, String, Class,
Class, Class)

I see same kind of problem even with saveAsNewAPIHadoopFiles API .

Thanks,
Baahu


textFileStream() issue?

2014-12-03 Thread Bahubali Jain
Hi,
I am trying to use textFileStream("some_hdfs_location") to pick new files
from a HDFS location.I am seeing a pretty strange behavior though.
textFileStream() is not detecting new files when I "move" them from a
location with in hdfs to location at which textFileStream() is checking for
new files.
But when I copy files from a location in linux filesystem to hdfs then the
textFileStream is detecting the new files.

Is this a know issue?

Thanks,
Baahu


Re: Time based aggregation in Real time Spark Streaming

2014-12-01 Thread Bahubali Jain
Hi,
You can associate all the messages of a 3min interval with a unique key and
then group by and finally add up.

Thanks
On Dec 1, 2014 9:02 PM, "pankaj"  wrote:

> Hi,
>
> My incoming message has time stamp as one field and i have to perform
> aggregation over 3 minute of time slice.
>
> Message sample
>
> "Item ID" "Item Type" "timeStamp"
> 1  X   1-12-2014:12:01
> 1  X   1-12-2014:12:02
> 1  X   1-12-2014:12:03
> 1  y   1-12-2014:12:04
> 1  y   1-12-2014:12:05
> 1  y   1-12-2014:12:06
>
> Aggregation Result
> ItemIdItemType  count   aggregationStartTimeaggrEndTime
> 1  X 3  1-12-2014:12:01
>   1-12-2014:12:03
> 1  y  3   1-12-2014:12:04
>  1-12-2014:12:06
>
> What is the best way to perform time based aggregation in spark.
> Kindly suggest.
>
> Thanks
>
> --
> View this message in context: Time based aggregation in Real time Spark
> Streaming
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Help with Spark Streaming

2014-11-16 Thread Bahubali Jain
Hi,
Can anybody help me on this please, haven't been  able to find the problem
:(

Thanks.
On Nov 15, 2014 4:48 PM, "Bahubali Jain"  wrote:

> Hi,
> Trying to use spark streaming, but I am struggling with word count :(
> I want consolidate output of the word count (not on a per window basis),
> so I am using updateStateByKey(), but for some reason this is not working.
> The function it self is not being invoked(do not see the sysout output on
> console).
>
>
> public final class WordCount {
>   private static final Pattern SPACE = Pattern.compile(" ");
>
>   public static void main(String[] args) {
> if (args.length < 2) {
>   System.err.println("Usage: JavaNetworkWordCount 
> ");
>   System.exit(1);
> }
>
>  // Create the context with a 1 second batch size
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaNetworkWordCount");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> new Duration(1000));
> ssc.checkpoint("/tmp/worcount");
> // Create a JavaReceiverInputDStream on target ip:port and count
> the
> // words in input stream of \n delimited text (eg. generated by
> 'nc')
> // Note that no duplication in storage level only for running
> locally.
> // Replication necessary in distributed scenario for fault
> tolerance.
> JavaReceiverInputDStream lines = ssc.socketTextStream(
> args[0], Integer.parseInt(args[1]),
> StorageLevels.MEMORY_AND_DISK_SER);
> JavaDStream words = lines.flatMap(new
> FlatMapFunction() {
>   @Override
>   public Iterable call(String x) {
> return Lists.newArrayList(SPACE.split(x));
>   }
> });
>
> JavaPairDStream wordCounts = words.mapToPair(
>   new PairFunction() {
> @Override
> public Tuple2 call(String s) {
> System.err.println("Got "+s);
>   return new Tuple2(s, 1);
> }
>   }).reduceByKey(new Function2() {
> @Override
> public Integer call(Integer i1, Integer i2) {
>   return i1 + i2;
> }
>   });
>
> wordCounts.print();
>
> *wordCounts.updateStateByKey(new updateFunction());*
>  ssc.start();
> ssc.awaitTermination();
>   }
> }
>
> class updateFunction implements Function2,
> Optional, Optional>
> {
>
>   @Override public Optional call(List values,
> Optional state) {
>
>  Integer x = new Integer(0);
>  for (Integer i:values)
>  x = x+i;
> Integer newSum = state.or(0)+x;  // add the new values with the
> previous running count to get the new count
> System.out.println("Newsum is "+newSum);
> return Optional.of(newSum);
>
>   };
>
> }
>


Help with Spark Streaming

2014-11-15 Thread Bahubali Jain
Hi,
Trying to use spark streaming, but I am struggling with word count :(
I want consolidate output of the word count (not on a per window basis), so
I am using updateStateByKey(), but for some reason this is not working.
The function it self is not being invoked(do not see the sysout output on
console).


public final class WordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) {
if (args.length < 2) {
  System.err.println("Usage: JavaNetworkWordCount 
");
  System.exit(1);
}

 // Create the context with a 1 second batch size
SparkConf sparkConf = new
SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,  new
Duration(1000));
ssc.checkpoint("/tmp/worcount");
// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by
'nc')
// Note that no duplication in storage level only for running
locally.
// Replication necessary in distributed scenario for fault
tolerance.
JavaReceiverInputDStream lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream words = lines.flatMap(new
FlatMapFunction() {
  @Override
  public Iterable call(String x) {
return Lists.newArrayList(SPACE.split(x));
  }
});

JavaPairDStream wordCounts = words.mapToPair(
  new PairFunction() {
@Override
public Tuple2 call(String s) {
System.err.println("Got "+s);
  return new Tuple2(s, 1);
}
  }).reduceByKey(new Function2() {
@Override
public Integer call(Integer i1, Integer i2) {
  return i1 + i2;
}
  });

wordCounts.print();

*wordCounts.updateStateByKey(new updateFunction());*
 ssc.start();
ssc.awaitTermination();
  }
}

class updateFunction implements Function2, Optional,
Optional>
{

  @Override public Optional call(List values,
Optional state) {

 Integer x = new Integer(0);
 for (Integer i:values)
 x = x+i;
Integer newSum = state.or(0)+x;  // add the new values with the
previous running count to get the new count
System.out.println("Newsum is "+newSum);
return Optional.of(newSum);

  };

}


Issue with Custom Key Class

2014-11-08 Thread Bahubali Jain
Hi,
I have a custom key class.In this class equals() and hashcode() have been
overridden.
I have a javaPairRDD which has this class as the key .When  groupbykey() or
reducebykey() is called  a null object is being passed to the function
*equals*(Object obj) as a result the grouping is failing.
Is this a known issue?
I am using Spark 0.9 version.

Thanks,
Baahu


-- 
Twitter:http://twitter.com/Baahu