Insert a JavaPairDStream into multiple cassandra table on the basis of key.

2016-11-02 Thread Abhishek Anand
Hi All,

I have a JavaPairDStream. I want to insert this dstream into multiple
cassandra tables on the basis of key. One approach is to filter each key
and then insert it into cassandra table. But this would call filter
operation "n" times depending on the number of keys.

Is there any better approach to achieve this more quickly ?

Thanks
Abhi


Re: Finding unique across all columns in dataset

2016-09-19 Thread Abhishek Anand
Hi Ayan,

How will I get column wise distinct items using this approach ?

On Mon, Sep 19, 2016 at 3:31 PM, ayan guha  wrote:

> Create an array out of cilumns, convert to Dataframe,
> explode,distinct,write.
> On 19 Sep 2016 19:11, "Saurav Sinha"  wrote:
>
>> You can use distinct over you data frame or rdd
>>
>> rdd.distinct
>>
>> It will give you distinct across your row.
>>
>> On Mon, Sep 19, 2016 at 2:35 PM, Abhishek Anand 
>> wrote:
>>
>>> I have an rdd which contains 14 different columns. I need to find the
>>> distinct across all the columns of rdd and write it to hdfs.
>>>
>>> How can I acheive this ?
>>>
>>> Is there any distributed data structure that I can use and keep on
>>> updating it as I traverse the new rows ?
>>>
>>> Regards,
>>> Abhi
>>>
>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>


Finding unique across all columns in dataset

2016-09-19 Thread Abhishek Anand
I have an rdd which contains 14 different columns. I need to find the
distinct across all the columns of rdd and write it to hdfs.

How can I acheive this ?

Is there any distributed data structure that I can use and keep on updating
it as I traverse the new rows ?

Regards,
Abhi


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Thanks Nihed.

I was able to do this in exactly the same way.


Cheers!!
Abhi

On Mon, Jul 18, 2016 at 5:56 PM, nihed mbarek  wrote:

> and if we have this static method
> df.show();
> Column c = concatFunction(df, "l1", "firstname,lastname");
> df.select(c).show();
>
> with this code :
> Column concatFunction(DataFrame df, String fieldName, String columns) {
> String[] array = columns.split(",");
> Column[] concatColumns = new Column[array.length];
> for (int i = 0; i < concatColumns.length; i++) {
> concatColumns[i]=df.col(array[i]);
> }
>
> return functions.concat(concatColumns).alias(fieldName);
> }
>
>
>
> On Mon, Jul 18, 2016 at 2:14 PM, Abhishek Anand 
> wrote:
>
>> Hi Nihed,
>>
>> Thanks for the reply.
>>
>> I am looking for something like this :
>>
>> DataFrame training = orgdf.withColumn("I1",
>> functions.concat(orgdf.col("C0"),orgdf.col("C1")));
>>
>>
>> Here I have to give C0 and C1 columns, I am looking to write a generic
>> function that concatenates the columns depending on input columns.
>>
>> like if I have something
>> String str = "C0,C1,C2"
>>
>> Then it should work as
>>
>> DataFrame training = orgdf.withColumn("I1",
>> functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));
>>
>>
>>
>> Thanks,
>> Abhi
>>
>> On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek  wrote:
>>
>>> Hi,
>>>
>>>
>>> I just wrote this code to help you. Is it what you need ??
>>>
>>>
>>> SparkConf conf = new
>>> SparkConf().setAppName("hello").setMaster("local");
>>> JavaSparkContext sc = new JavaSparkContext(conf);
>>> SQLContext sqlContext = new SQLContext(sc);
>>> List persons = new ArrayList<>();
>>> persons.add(new Person("nihed", "mbarek", "nihed.com"));
>>> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>>>
>>> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>>>
>>> df.show();
>>> final String[] columns = df.columns();
>>>     Column[] selectColumns = new Column[columns.length + 1];
>>> for (int i = 0; i < columns.length; i++) {
>>> selectColumns[i]=df.col(columns[i]);
>>> }
>>>
>>>
>>> selectColumns[columns.length]=functions.concat(df.col("firstname"),
>>> df.col("lastname"));
>>>
>>> df.select(selectColumns).show();
>>>   ---
>>> public static class Person {
>>>
>>> private String firstname;
>>> private String lastname;
>>> private String address;
>>> }
>>>
>>>
>>>
>>> Regards,
>>>
>>> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a dataframe say having C0,C1,C2 and so on as columns.
>>>>
>>>> I need to create interaction variables to be taken as input for my
>>>> program.
>>>>
>>>> For eg -
>>>>
>>>> I need to create I1 as concatenation of C0,C3,C5
>>>>
>>>> Similarly, I2  = concat(C4,C5)
>>>>
>>>> and so on ..
>>>>
>>>>
>>>> How can I achieve this in my Java code for concatenation of any columns
>>>> given input by the user.
>>>>
>>>> Thanks,
>>>> Abhi
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> M'BAREK Med Nihed,
>>> Fedora Ambassador, TUNISIA, Northern Africa
>>> http://www.nihed.com
>>>
>>> <http://tn.linkedin.com/in/nihed>
>>>
>>>
>>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> <http://tn.linkedin.com/in/nihed>
>
>


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Hi Nihed,

Thanks for the reply.

I am looking for something like this :

DataFrame training = orgdf.withColumn("I1",
functions.concat(orgdf.col("C0"),orgdf.col("C1")));


Here I have to give C0 and C1 columns, I am looking to write a generic
function that concatenates the columns depending on input columns.

like if I have something
String str = "C0,C1,C2"

Then it should work as

DataFrame training = orgdf.withColumn("I1",
functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));



Thanks,
Abhi

On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek  wrote:

> Hi,
>
>
> I just wrote this code to help you. Is it what you need ??
>
>
> SparkConf conf = new
> SparkConf().setAppName("hello").setMaster("local");
> JavaSparkContext sc = new JavaSparkContext(conf);
> SQLContext sqlContext = new SQLContext(sc);
> List persons = new ArrayList<>();
> persons.add(new Person("nihed", "mbarek", "nihed.com"));
> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>
> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>
> df.show();
> final String[] columns = df.columns();
> Column[] selectColumns = new Column[columns.length + 1];
> for (int i = 0; i < columns.length; i++) {
> selectColumns[i]=df.col(columns[i]);
> }
>
>
> selectColumns[columns.length]=functions.concat(df.col("firstname"),
> df.col("lastname"));
>
>     df.select(selectColumns).show();
>   ---
> public static class Person {
>
> private String firstname;
> private String lastname;
> private String address;
> }
>
>
>
> Regards,
>
> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand 
> wrote:
>
>> Hi,
>>
>> I have a dataframe say having C0,C1,C2 and so on as columns.
>>
>> I need to create interaction variables to be taken as input for my
>> program.
>>
>> For eg -
>>
>> I need to create I1 as concatenation of C0,C3,C5
>>
>> Similarly, I2  = concat(C4,C5)
>>
>> and so on ..
>>
>>
>> How can I achieve this in my Java code for concatenation of any columns
>> given input by the user.
>>
>> Thanks,
>> Abhi
>>
>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> <http://tn.linkedin.com/in/nihed>
>
>


Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Hi,

I have a dataframe say having C0,C1,C2 and so on as columns.

I need to create interaction variables to be taken as input for my program.

For eg -

I need to create I1 as concatenation of C0,C3,C5

Similarly, I2  = concat(C4,C5)

and so on ..


How can I achieve this in my Java code for concatenation of any columns
given input by the user.

Thanks,
Abhi


Change spark dataframe to LabeledPoint in Java

2016-06-30 Thread Abhishek Anand
Hi ,

I have a dataframe which i want to convert to labeled point.

DataFrame labeleddf = model.transform(newdf).select("label","features");

How can I convert this to a LabeledPoint to use in my Logistic Regression
model.

I could do this in scala using
val trainData = labeleddf.map(row =>
LabeledPoint(row.getDouble(0), row(1).asInstanceOf[Vector])).cache()


How to achieve the same in Java,

Thanks,
Abhi


Re: spark.hadoop.dfs.replication parameter not working for kafka-spark streaming

2016-05-31 Thread Abhishek Anand
I also tried


jsc.sparkContext().sc().hadoopConfiguration().set("dfs.replication", "2")

But, still its not working.

Any ideas why its not working ?


Abhi

On Tue, May 31, 2016 at 4:03 PM, Abhishek Anand 
wrote:

> My spark streaming checkpoint directory is being written to HDFS with
> default replication factor of 3.
>
> In my streaming application where I am listening from kafka and setting
> the dfs.replication = 2 as below the files are still being written with
> replication factor=3
>
> SparkConf sparkConfig = new
> SparkConf().setMaster("mymaster").set("spark.hadoop.dfs.replication", "2");
>
> Is there anything else that I need to do ??
>
>
> Thanks !!!
> Abhi
>


spark.hadoop.dfs.replication parameter not working for kafka-spark streaming

2016-05-31 Thread Abhishek Anand
My spark streaming checkpoint directory is being written to HDFS with
default replication factor of 3.

In my streaming application where I am listening from kafka and setting the
dfs.replication = 2 as below the files are still being written with
replication factor=3

SparkConf sparkConfig = new
SparkConf().setMaster("mymaster").set("spark.hadoop.dfs.replication", "2");

Is there anything else that I need to do ??


Thanks !!!
Abhi


Re: Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Abhishek Anand
Thanks Yanbo.

So, you mean that if I have a variable which is of type double but I want
to treat it like String in my model I just have to cast those columns into
string and simply run the glm model. String columns will be directly
one-hot encoded by the glm provided by sparkR ?

Just wanted to clarify as in R we need to apply as.factor for categorical
variables.

val dfNew = df.withColumn("C0",df.col("C0").cast("String"))


Abhi !!

On Mon, May 30, 2016 at 2:58 PM, Yanbo Liang  wrote:

> Hi Abhi,
>
> In SparkR glm, category features (columns of type string) will be one-hot
> encoded automatically.
> So pre-processing like `as.factor` is not necessary, you can directly feed
> your data to the model training.
>
> Thanks
> Yanbo
>
> 2016-05-30 2:06 GMT-07:00 Abhishek Anand :
>
>> Hi ,
>>
>> I want to run glm variant of sparkR for my data that is there in a csv
>> file.
>>
>> I see that the glm function in sparkR takes a spark dataframe as input.
>>
>> Now, when I read a file from csv and create a spark dataframe, how could
>> I take care of the factor variables/columns in my data ?
>>
>> Do I need to convert it to a R dataframe, convert to factor using
>> as.factor and create spark dataframe and run glm over it ?
>>
>> But, running as.factor over big dataset is not possible.
>>
>> Please suggest what is the best way to acheive this ?
>>
>> What pre-processing should be done, and what is the best way to achieve
>> it  ?
>>
>>
>> Thanks,
>> Abhi
>>
>
>


Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Abhishek Anand
Hi ,

I want to run glm variant of sparkR for my data that is there in a csv file.

I see that the glm function in sparkR takes a spark dataframe as input.

Now, when I read a file from csv and create a spark dataframe, how could I
take care of the factor variables/columns in my data ?

Do I need to convert it to a R dataframe, convert to factor using as.factor
and create spark dataframe and run glm over it ?

But, running as.factor over big dataset is not possible.

Please suggest what is the best way to acheive this ?

What pre-processing should be done, and what is the best way to achieve it
 ?


Thanks,
Abhi


Calculating log-loss for the trained model in Spark ML

2016-05-03 Thread Abhishek Anand
I am building a ML pipeline for logistic regression.

val lr = new LogisticRegression()

lr.setMaxIter(100).setRegParam(0.001)

val pipeline = new
Pipeline().setStages(Array(geoDimEncoder,clientTypeEncoder,
   devTypeDimIdEncoder,pubClientIdEncoder,tmpltIdEncoder,
   hourEncoder,assembler,lr))

val model = pipeline.fit(trainingDF)

Now, when the model is trained, I want to see the value
the probabilities for the training set and compute certain
validation parameters like log-loss. But, I am unable to find
this using "model".

The only thing I could find is

model.transform(testDF).select()

Cannot I get the metrics using the trained set for training set validation ?

Thanks !!


Re: removing header from csv file

2016-05-03 Thread Abhishek Anand
You can use this function to remove the header from your dataset(applicable
to RDD)

def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
  if (idx == 0) {
lines.drop(1)
  }
  lines
})
}


Abhi

On Wed, Apr 27, 2016 at 12:55 PM, Marco Mistroni 
wrote:

> If u r using Scala api you can do
> Myrdd.zipwithindex.filter(_._2 >0).map(_._1)
>
> Maybe a little bit complicated but will do the trick
> As per spark CSV, you will get back a data frame which you can reconduct
> to rdd. .
> Hth
> Marco
> On 27 Apr 2016 6:59 am, "nihed mbarek"  wrote:
>
>> You can add a filter with string that you are sure available only in the
>> header
>>
>> Le mercredi 27 avril 2016, Divya Gehlot  a
>> écrit :
>>
>>> yes you can remove the headers by removing the first row
>>>
>>> can first() or head() to do that
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>> On 27 April 2016 at 13:24, Ashutosh Kumar 
>>> wrote:
>>>
 I see there is a library spark-csv which can be used for removing
 header and processing of csv files. But it seems it works with sqlcontext
 only. Is there a way to remove header from csv files without sqlcontext ?

 Thanks
 Ashutosh

>>>
>>>
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>>


Clear Threshold in Logistic Regression ML Pipeline

2016-05-03 Thread Abhishek Anand
Hi All,

I am trying to build a logistic regression pipeline in ML.

How can I clear the threshold which by default is 0.5. In mllib I am able
to clear the threshold to get the raw predictions using
model.clearThreshold() function.


Regards,
Abhi


Fwd: Facing Unusual Behavior with the executors in spark streaming

2016-04-05 Thread Abhishek Anand
Hi ,

Needed inputs for a couple of issue that I am facing in my production
environment.

I am using spark version 1.4.0 spark streaming.

1) It so happens that the worker is lost on a machine and the executor
still shows up in the executor's tab in the UI.

Even when I kill a worker using kill -9 command the worker and executor
both dies on that machine but executor still shows up in the executors tab
on the UI. The number of active tasks sometimes shows negative on that
executor and my job keeps failing with following exception.

This usually happens when a job is running. When no computation is taking
place on the cluster i.e suppose a 1 min batch gets completed in 20 secs
and I kill the worker then executor entry is also gone from the UI but when
I kill the worker when a job is still running I run into this issue always.


16/04/01 23:54:20 WARN TaskSetManager: Lost task 141.0 in stage 19859.0
(TID 190333, 192.168.33.96): java.io.IOException: Failed to connect to /
192.168.33.97:63276
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.33.97:63276
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more



 When I relaunch the worker new executors are added but the dead one's
entry is still there until the application is killed.

 2) Another issue is when the disk becomes full on one of the workers, the
executor becomes unresponsive and the job stucks at a particular stage. The
exception that I can see in the executor logs is


 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:315)
at
org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)


As a workaround I have to kill the executor, clear the space on disk and
new executor  relaunched by the worker and the failed stages are
recomputed. But, is it really the case that when the space is full on a
machine then my application gets stuck ?




This is really becoming a bottleneck and leads to unstability of my
production stack.

Please share your insights on this.


Thanks,
Abhi


Timeout in mapWithState

2016-04-04 Thread Abhishek Anand
What exactly is timeout in mapWithState ?

I want the keys to get remmoved from the memory if there is no data
received on that key for 10 minutes.

How can I acheive this in mapWithState ?

Regards,
Abhi


Re: Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-04-01 Thread Abhishek Anand
Hi Ted,

Any thoughts on this ???

I am getting the same kind of error when I kill a worker on one of the
machines.
Even after killing the worker using kill -9 command, the executor shows up
on the spark UI with negative active tasks.

All the tasks on that worker starts to fail with the following exception.


16/04/01 23:54:20 WARN TaskSetManager: Lost task 141.0 in stage 19859.0
(TID 190333, 192.168.33.96): java.io.IOException: Failed to connect to /
192.168.33.97:63276
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.33.97:63276
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more




Cheers !!
Abhi

On Fri, Apr 1, 2016 at 9:04 AM, Abhishek Anand 
wrote:

> This is what I am getting in the executor logs
>
> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
> reverting partial writes to file
> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
> java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:315)
> at
> org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)
>
>
>
> It happens every time the disk is full.
>
> On Fri, Apr 1, 2016 at 2:18 AM, Ted Yu  wrote:
>
>> Can you show the stack trace ?
>>
>> The log message came from
>> DiskBlockObjectWriter#revertPartialWritesAndClose().
>> Unfortunately, the method doesn't throw exception, making it a bit hard
>> for caller to know of the disk full condition.
>>
>> On Thu, Mar 31, 2016 at 11:32 AM, Abhishek Anand > > wrote:
>>
>>>
>>> Hi,
>>>
>>> Why is it so that when my disk space is full on one of the workers then
>>> the executor on that worker becomes unresponsive and the jobs on that
>>> worker fails with the exception
>>>
>>>
>>> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
>>> reverting partial writes to file
>>> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
>>> java.io.IOException: No space left on device
>>>
>>>
>>> This is leading to my job getting stuck.
>>>
>>> As a workaround I have to kill the executor, clear the space on disk and
>>> new executor  relaunched by the worker and the failed stages are recomputed.
>>>
>>>
>>> How can I get rid of this problem i.e why my job get stuck on disk full
>>> issue on one of the workers ?
>>>
>>>
>>> Cheers !!!
>>> Abhi
>>>
>>>
>>
>


Re: Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-03-31 Thread Abhishek Anand
This is what I am getting in the executor logs

16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:315)
at
org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)



It happens every time the disk is full.

On Fri, Apr 1, 2016 at 2:18 AM, Ted Yu  wrote:

> Can you show the stack trace ?
>
> The log message came from
> DiskBlockObjectWriter#revertPartialWritesAndClose().
> Unfortunately, the method doesn't throw exception, making it a bit hard
> for caller to know of the disk full condition.
>
> On Thu, Mar 31, 2016 at 11:32 AM, Abhishek Anand 
> wrote:
>
>>
>> Hi,
>>
>> Why is it so that when my disk space is full on one of the workers then
>> the executor on that worker becomes unresponsive and the jobs on that
>> worker fails with the exception
>>
>>
>> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
>> reverting partial writes to file
>> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
>> java.io.IOException: No space left on device
>>
>>
>> This is leading to my job getting stuck.
>>
>> As a workaround I have to kill the executor, clear the space on disk and
>> new executor  relaunched by the worker and the failed stages are recomputed.
>>
>>
>> How can I get rid of this problem i.e why my job get stuck on disk full
>> issue on one of the workers ?
>>
>>
>> Cheers !!!
>> Abhi
>>
>>
>


Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-03-31 Thread Abhishek Anand
Hi,

Why is it so that when my disk space is full on one of the workers then the
executor on that worker becomes unresponsive and the jobs on that worker
fails with the exception


16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device


This is leading to my job getting stuck.

As a workaround I have to kill the executor, clear the space on disk and
new executor  relaunched by the worker and the failed stages are recomputed.


How can I get rid of this problem i.e why my job get stuck on disk full
issue on one of the workers ?


Cheers !!!
Abhi


Output the data to external database at particular time in spark streaming

2016-03-08 Thread Abhishek Anand
I have a spark streaming job where I am aggregating the data by doing
reduceByKeyAndWindow with inverse function.

I am keeping the data in memory for upto 2 hours and In order to output the
reduced data to an external storage I conditionally need to puke the data
to DB say at every 15th minute of the each hour.

How can this be achieved.


Regards,
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-29 Thread Abhishek Anand
Hi Ryan,

Its not working even after removing the reduceByKey.

So, basically I am doing the following
- reading from kafka
- flatmap inside transform
- mapWithState
- rdd.count on output of mapWithState

But to my surprise still dont see checkpointing taking place.

Is there any restriction to the type of operation that we can perform
inside mapWithState ?

Really need to resolve this one as currently if my application is restarted
from checkpoint it has to repartition 120 previous stages which takes hell
lot of time.

Thanks !!
Abhi

On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu  wrote:

> Sorry that I forgot to tell you that you should also call `rdd.count()`
> for "reduceByKey" as well. Could you try it and see if it works?
>
> On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand 
> wrote:
>
>> Hi Ryan,
>>
>> I am using mapWithState after doing reduceByKey.
>>
>> I am right now using mapWithState as you suggested and triggering the
>> count manually.
>>
>> But, still unable to see any checkpointing taking place. In the DAG I can
>> see that the reduceByKey operation for the previous batches are also being
>> computed.
>>
>>
>> Thanks
>> Abhi
>>
>>
>> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Using reducebykeyandwindow and mapWithState will trigger the bug
>>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>>
>>> JavaMapWithStateDStream<...> stateDStream =
>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>> stateDStream.foreachRDD(new Function1<...>() {
>>>   @Override
>>>   public Void call(JavaRDD<...> rdd) throws Exception {
>>> rdd.count();
>>>   }
>>> });
>>> return stateDStream.stateSnapshots();
>>>
>>>
>>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Hi Ryan,
>>>>
>>>> Reposting the code.
>>>>
>>>> Basically my use case is something like - I am receiving the web
>>>> impression logs and may get the notify (listening from kafka) for those
>>>> impressions in the same interval (for me its 1 min) or any next interval
>>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>>> need to swap the date field in impression with the date field in notify
>>>> logs. The notify for an impression has the same key as impression.
>>>>
>>>> static Function3, State,
>>>> Tuple2> mappingFunc =
>>>> new Function3, State, Tuple2>>> MyClass>>() {
>>>> @Override
>>>> public Tuple2 call(String key, Optional one,
>>>> State state) {
>>>> MyClass nullObj = new MyClass();
>>>> nullObj.setImprLog(null);
>>>> nullObj.setNotifyLog(null);
>>>> MyClass current = one.or(nullObj);
>>>>
>>>> if(current!= null && current.getImprLog() != null &&
>>>> current.getMyClassType() == 1 /*this is impression*/){
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>>> 3 /*notify for the impression received*/){
>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>  //swappping the dates
>>>> return new Tuple2<>(key, oldState);
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>>
>>>> }
>>>> };
>>>>
>>>>
>>>> return
>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>>
>>>>
>>>> Currently I am using reducebykeyandwindow without the inverse function
>>>> and I am able to get the correct data. But, issue the might arise is when I
>>>> have to restart my application from checkpoint and it repartitions and
>>>> computes the previous 120 partitions, which delays the incoming batches.
>>>>
>>>>
>>>> Thanks !!
>>>> Abh

Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-29 Thread Abhishek Anand
Hi Ryan,

I was able to resolve this issue. The /tmp location was mounted with
"noexec" option. Removing this noexec in the fstab resolved the issue. The
snappy shared object file is created at the /tmp location so either
removing the noexec from mount or changing the default temp location solved
ths issue.

export _JAVA_OPTIONS=-Djava.io.tmpdir=/mycustometemplocation



Thanks !!
Abhi


On Mon, Feb 29, 2016 at 3:46 AM, Shixiong(Ryan) Zhu  wrote:

> This is because the Snappy library cannot load the native library. Did you
> forget to install the snappy native library in your new machines?
>
> On Fri, Feb 26, 2016 at 11:05 PM, Abhishek Anand 
> wrote:
>
>> Any insights on this ?
>>
>> On Fri, Feb 26, 2016 at 1:21 PM, Abhishek Anand 
>> wrote:
>>
>>> On changing the default compression codec which is snappy to lzf the
>>> errors are gone !!
>>>
>>> How can I fix this using snappy as the codec ?
>>>
>>> Is there any downside of using lzf as snappy is the default codec that
>>> ships with spark.
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>> On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand >> > wrote:
>>>
>>>> Hi ,
>>>>
>>>> I am getting the following exception on running my spark streaming job.
>>>>
>>>> The same job has been running fine since long and when I added two new
>>>> machines to my cluster I see the job failing with the following exception.
>>>>
>>>>
>>>>
>>>> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
>>>> (TID 22594)
>>>> java.io.IOException: java.lang.reflect.InvocationTargetException
>>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>>> at
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>>>> at
>>>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>>>> at
>>>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>>>> at org.apache.spark.broadcast.TorrentBroadcast.org
>>>> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
>>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>>>> ... 11 more
>>>> Caused by: java.lang.IllegalArgumentException
>>>> at
>>>> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
>>>> ... 20 more
>>>>
>>>>
>>>>
>>>> Thanks !!!
>>>> Abhi
>>>>
>>>
>>>
>>
>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-27 Thread Abhishek Anand
Hi Ryan,

I am using mapWithState after doing reduceByKey.

I am right now using mapWithState as you suggested and triggering the count
manually.

But, still unable to see any checkpointing taking place. In the DAG I can
see that the reduceByKey operation for the previous batches are also being
computed.


Thanks
Abhi


On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu  wrote:

> Hey Abhi,
>
> Using reducebykeyandwindow and mapWithState will trigger the bug
> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>
> JavaMapWithStateDStream<...> stateDStream =
> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
> stateDStream.foreachRDD(new Function1<...>() {
>   @Override
>   public Void call(JavaRDD<...> rdd) throws Exception {
> rdd.count();
>   }
> });
> return stateDStream.stateSnapshots();
>
>
> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand 
> wrote:
>
>> Hi Ryan,
>>
>> Reposting the code.
>>
>> Basically my use case is something like - I am receiving the web
>> impression logs and may get the notify (listening from kafka) for those
>> impressions in the same interval (for me its 1 min) or any next interval
>> (upto 2 hours). Now, when I receive notify for a particular impression I
>> need to swap the date field in impression with the date field in notify
>> logs. The notify for an impression has the same key as impression.
>>
>> static Function3, State,
>> Tuple2> mappingFunc =
>> new Function3, State, Tuple2> MyClass>>() {
>> @Override
>> public Tuple2 call(String key, Optional one,
>> State state) {
>> MyClass nullObj = new MyClass();
>> nullObj.setImprLog(null);
>> nullObj.setNotifyLog(null);
>> MyClass current = one.or(nullObj);
>>
>> if(current!= null && current.getImprLog() != null &&
>> current.getMyClassType() == 1 /*this is impression*/){
>> return new Tuple2<>(key, null);
>> }
>> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
>> /*notify for the impression received*/){
>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>> if(oldState!= null && oldState.getNotifyLog() != null){
>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>  //swappping the dates
>> return new Tuple2<>(key, oldState);
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>>
>> }
>> };
>>
>>
>> return
>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>
>>
>> Currently I am using reducebykeyandwindow without the inverse function
>> and I am able to get the correct data. But, issue the might arise is when I
>> have to restart my application from checkpoint and it repartitions and
>> computes the previous 120 partitions, which delays the incoming batches.
>>
>>
>> Thanks !!
>> Abhi
>>
>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Could you post how you use mapWithState? By default, it should do
>>> checkpointing every 10 batches.
>>> However, there is a known issue that prevents mapWithState from
>>> checkpointing in some special cases:
>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>
>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand >> > wrote:
>>>
>>>> Any Insights on this one ?
>>>>
>>>>
>>>> Thanks !!!
>>>> Abhi
>>>>
>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>> abhis.anan...@gmail.com> wrote:
>>>>
>>>>> I am now trying to use mapWithState in the following way using some
>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>> the state and when restarting the application from checkpoint, it
>>>>> re-partitions all the previous batches data from kafka.
>>>>>
>>>>> static Function3, State,
>>>>> Tuple2> mappingFunc =
>>>>> new Function3, State,
>>>>> Tuple2>() {
>>>>> @Override
>>>>> public Tuple2 call(String key, Optional one,
>>>>> State state) {
>>>>> MyClass nullObj = new MyClass();
>>>>> nullObj.setImprLog(null);
>>>>> nullObj.setNotif

Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-26 Thread Abhishek Anand
Any insights on this ?

On Fri, Feb 26, 2016 at 1:21 PM, Abhishek Anand 
wrote:

> On changing the default compression codec which is snappy to lzf the
> errors are gone !!
>
> How can I fix this using snappy as the codec ?
>
> Is there any downside of using lzf as snappy is the default codec that
> ships with spark.
>
>
> Thanks !!!
> Abhi
>
> On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand 
> wrote:
>
>> Hi ,
>>
>> I am getting the following exception on running my spark streaming job.
>>
>> The same job has been running fine since long and when I added two new
>> machines to my cluster I see the job failing with the following exception.
>>
>>
>>
>> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
>> (TID 22594)
>> java.io.IOException: java.lang.reflect.InvocationTargetException
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>> at
>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>> at
>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>> at org.apache.spark.broadcast.TorrentBroadcast.org
>> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>> ... 11 more
>> Caused by: java.lang.IllegalArgumentException
>> at
>> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
>> ... 20 more
>>
>>
>>
>> Thanks !!!
>> Abhi
>>
>
>


Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-25 Thread Abhishek Anand
On changing the default compression codec which is snappy to lzf the errors
are gone !!

How can I fix this using snappy as the codec ?

Is there any downside of using lzf as snappy is the default codec that
ships with spark.


Thanks !!!
Abhi

On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand 
wrote:

> Hi ,
>
> I am getting the following exception on running my spark streaming job.
>
> The same job has been running fine since long and when I added two new
> machines to my cluster I see the job failing with the following exception.
>
>
>
> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
> (TID 22594)
> java.io.IOException: java.lang.reflect.InvocationTargetException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> at
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
> at
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
> at org.apache.spark.broadcast.TorrentBroadcast.org
> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
> ... 11 more
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
> ... 20 more
>
>
>
> Thanks !!!
> Abhi
>


Query Kafka Partitions from Spark SQL

2016-02-23 Thread Abhishek Anand
Is there a way to query the json (or any other format) data stored in kafka
using spark sql by providing the offset range on each of the brokers ?

I just want to be able to query all the partitions in a sq manner.

Thanks !
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Abhishek Anand
Hi Ryan,

Reposting the code.

Basically my use case is something like - I am receiving the web impression
logs and may get the notify (listening from kafka) for those impressions in
the same interval (for me its 1 min) or any next interval (upto 2 hours).
Now, when I receive notify for a particular impression I need to swap the
date field in impression with the date field in notify logs. The notify for
an impression has the same key as impression.

static Function3, State, Tuple2> mappingFunc =
new Function3, State, Tuple2>() {
@Override
public Tuple2 call(String key, Optional one,
State state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1 /*this is impression*/){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
/*notify for the impression received*/){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
 //swappping the dates
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


return
myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();


Currently I am using reducebykeyandwindow without the inverse function and
I am able to get the correct data. But, issue the might arise is when I
have to restart my application from checkpoint and it repartitions and
computes the previous 120 partitions, which delays the incoming batches.


Thanks !!
Abhi

On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu  wrote:

> Hey Abhi,
>
> Could you post how you use mapWithState? By default, it should do
> checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from
> checkpointing in some special cases:
> https://issues.apache.org/jira/browse/SPARK-6847
>
> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand 
> wrote:
>
>> Any Insights on this one ?
>>
>>
>> Thanks !!!
>> Abhi
>>
>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand > > wrote:
>>
>>> I am now trying to use mapWithState in the following way using some
>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>> the state and when restarting the application from checkpoint, it
>>> re-partitions all the previous batches data from kafka.
>>>
>>> static Function3, State,
>>> Tuple2> mappingFunc =
>>> new Function3, State, Tuple2>> MyClass>>() {
>>> @Override
>>> public Tuple2 call(String key, Optional one,
>>> State state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> Please suggest if this is the proper way or am I doing something wrong.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu 
>>> wrote:
>>>
>>>> If you don't want to update your only option will be updateStateByKey
>>>> then
>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu"  wrote:
>>>>
>>>>> mapWithState supports checkpoint.
>>>>>
>>>>> There has been some bug fix since release of 1.6.0
>>>>> e.g.
>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>> with KryoSerializer
>>>>>
>>>>> which is in the upcoming 1.6.1
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek An

java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-22 Thread Abhishek Anand
Hi ,

I am getting the following exception on running my spark streaming job.

The same job has been running fine since long and when I added two new
machines to my cluster I see the job failing with the following exception.



16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
(TID 22594)
java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
at
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
at org.apache.spark.broadcast.TorrentBroadcast.org
$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
... 11 more
Caused by: java.lang.IllegalArgumentException
at
org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
... 20 more



Thanks !!!
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Abhishek Anand
Any Insights on this one ?


Thanks !!!
Abhi

On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand 
wrote:

> I am now trying to use mapWithState in the following way using some
> example codes. But, by looking at the DAG it does not seem to checkpoint
> the state and when restarting the application from checkpoint, it
> re-partitions all the previous batches data from kafka.
>
> static Function3, State, Tuple2 MyClass>> mappingFunc =
> new Function3, State, Tuple2 MyClass>>() {
> @Override
> public Tuple2 call(String key, Optional one,
> State state) {
> MyClass nullObj = new MyClass();
> nullObj.setImprLog(null);
> nullObj.setNotifyLog(null);
> MyClass current = one.or(nullObj);
>
> if(current!= null && current.getImprLog() != null &&
> current.getMyClassType() == 1){
> return new Tuple2<>(key, null);
> }
> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
> MyClass oldState = (state.exists() ? state.get() : nullObj);
> if(oldState!= null && oldState.getNotifyLog() != null){
> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
> return new Tuple2<>(key, oldState);
> }
> else{
> return new Tuple2<>(key, null);
> }
> }
> else{
> return new Tuple2<>(key, null);
> }
>
> }
> };
>
>
> Please suggest if this is the proper way or am I doing something wrong.
>
>
> Thanks !!
> Abhi
>
> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu 
> wrote:
>
>> If you don't want to update your only option will be updateStateByKey then
>> On 13 Feb 2016 8:48 p.m., "Ted Yu"  wrote:
>>
>>> mapWithState supports checkpoint.
>>>
>>> There has been some bug fix since release of 1.6.0
>>> e.g.
>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>>> KryoSerializer
>>>
>>> which is in the upcoming 1.6.1
>>>
>>> Cheers
>>>
>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Does mapWithState checkpoints the data ?
>>>>
>>>> When my application goes down and is restarted from checkpoint, will
>>>> mapWithState need to recompute the previous batches data ?
>>>>
>>>> Also, to use mapWithState I will need to upgrade my application as I am
>>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>> other work around ?
>>>>
>>>> Cheers!!
>>>> Abhi
>>>>
>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu >>> > wrote:
>>>>
>>>>> Looks like mapWithState could help you?
>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" 
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have an use case like follows in my production environment where I
>>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>> hours.
>>>>>>
>>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>>> but with different value,which might appear in the same batch or some 
>>>>>> next
>>>>>> batch.
>>>>>>
>>>>>> When the key appears second time I need to update a field in value of
>>>>>> previous key with a field in the later key. The keys for which the
>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>
>>>>>> At the end of each second I need to output the result to external
>>>>>> database.
>>>>>>
>>>>>> For example :
>>>>>>
>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>> At t=1sec I am getting
>>>>>> key0,value0(0,"prev0")
>>>>>> key1,value1 (1, "prev1")
>>>>>> key2,value2 (2,"prev2")
>>>>>> key2,value3 (3, "next2")
>>>>>>
>>>>>> Output to database after 1 sec
>>>>>> key2, newValue (2,"next2")
>>>>>>
>>>>>> At t=2 sec getting
>>>>>> key3,value4(4,"prev3")
>>>>>> key1,value5(5,"next1")
>>>>>>
>>>>>> Output to database after 2 sec
>>>>>> key1,newValue(1,"next1")
>>>>>>
>>>>>> At t=3 sec
>>>>>> key4,value6(6,"prev4")
>>>>>> key3,value7(7,"next3")
>>>>>> key5,value5(8,"prev5")
>>>>>> key5,value5(9,"next5")
>>>>>> key0,value0(10,"next0")
>>>>>>
>>>>>> Output to database after 3 sec
>>>>>> key0,newValue(0,"next0")
>>>>>> key3,newValue(4,"next3")
>>>>>> key5,newValue(8,"next5")
>>>>>>
>>>>>>
>>>>>> Please suggest how this can be achieved.
>>>>>>
>>>>>>
>>>>>> Thanks a lot 
>>>>>> Abhi
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>


Spark Streaming with Kafka Use Case

2016-02-17 Thread Abhishek Anand
I have a spark streaming application running in production. I am trying to
find a solution for a particular use case when my application has a
downtime of say 5 hours and is restarted. Now, when I start my streaming
application after 5 hours there would be considerable amount of data then
in the Kafka and my cluster would be unable to repartition and process that.

Is there any workaround so that when my streaming application starts it
starts taking data for 1-2 hours, process it , then take the data for next
1 hour process it. Now when its done processing of previous 5 hours data
which missed, normal streaming should start with the given slide interval.

Please suggest any ideas and feasibility of this.


Thanks !!
Abhi


Re: Worker's BlockManager Folder not getting cleared

2016-02-17 Thread Abhishek Anand
Looking for answer to this.

Is it safe to delete the older files using

find . -type f -cmin +200 -name "shuffle*" -exec rm -rf {} \;

For a window duration of 2 hours how older files can we delete ?

Thanks.

On Sun, Feb 14, 2016 at 12:34 PM, Abhishek Anand 
wrote:

> Hi All,
>
> Any ideas on this one ?
>
> The size of this directory keeps on growing.
>
> I can see there are many files from a day earlier too.
>
> Cheers !!
> Abhi
>
> On Tue, Jan 26, 2016 at 7:13 PM, Abhishek Anand 
> wrote:
>
>> Hi Adrian,
>>
>> I am running spark in standalone mode.
>>
>> The spark version that I am using is 1.4.0
>>
>> Thanks,
>> Abhi
>>
>> On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett 
>> wrote:
>>
>>> Hi Abhi - are you running on Mesos perchance?
>>>
>>> If so then with spark <1.6 you will be hitting
>>> https://issues.apache.org/jira/browse/SPARK-10975
>>> With spark >= 1.6:
>>> https://issues.apache.org/jira/browse/SPARK-12430
>>> and also be aware of:
>>> https://issues.apache.org/jira/browse/SPARK-12583
>>>
>>>
>>> On 25/01/2016 07:14, Abhishek Anand wrote:
>>>
>>> Hi All,
>>>
>>> How long the shuffle files and data files are stored on the block
>>> manager folder of the workers.
>>>
>>> I have a spark streaming job with window duration of 2 hours and slide
>>> interval of 15 minutes.
>>>
>>> When I execute the following command in my block manager path
>>>
>>> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>>>
>>> I see a lot of files which means that they are not getting cleared which
>>> I was expecting that they should get cleared.
>>>
>>> Subsequently, this size keeps on increasing and takes space on the disk.
>>>
>>> Please suggest how to get rid of this and help on understanding this
>>> behaviour.
>>>
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>>
>>> --
>>> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
>>> <http://www.opensignal.com>
>>> _
>>> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
>>> Phone #: +44 777-377-8251
>>> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>
>>>   |  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
>>> _
>>>
>>
>>
>


Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-16 Thread Abhishek Anand
Hi Cody,

I am able to do using this piece of code

kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> {
Date currentBatchTime = new Date();
currentBatchTime.setTime(batchMilliSec.milliseconds());
List r = new ArrayList();
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
for(int partition = 0; partition < offsetRanges.length; partition++){
//Add offsets to the list
}
JavaSparkContext ctx = new JavaSparkContext(rdd.context());
JavaRDD currrentBatchOffsets = ctx.parallelize(r);
//write currrentBatchOffsets rdd to cassandra
return null;
});


Is this the correct way of doing this ?


Thanks !!
Abhi

On Tue, Feb 16, 2016 at 9:31 PM, Cody Koeninger  wrote:

> You could use sc.parallelize... but the offsets are already available at
> the driver, and they're a (hopefully) small enough amount of data that's
> it's probably more straightforward to just use the normal cassandra client
> to save them from the driver.
>
> On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand 
> wrote:
>
>> I have a kafka rdd and I need to save the offsets to cassandra table at
>> the begining of each batch.
>>
>> Basically I need to write the offsets of the type Offsets below that I am
>> getting inside foreachRD, to cassandra. The javafunctions api to write to
>> cassandra needs a rdd. How can I create a rdd from offsets and write to
>> cassandra table.
>>
>>
>> public static void writeOffsets(JavaPairDStream> String> kafkastream){
>> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
>> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>> return null;
>> });
>>
>>
>> Thanks !!
>> Abhi
>>
>>
>>
>


Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-15 Thread Abhishek Anand
I have a kafka rdd and I need to save the offsets to cassandra table at the
begining of each batch.

Basically I need to write the offsets of the type Offsets below that I am
getting inside foreachRD, to cassandra. The javafunctions api to write to
cassandra needs a rdd. How can I create a rdd from offsets and write to
cassandra table.


public static void writeOffsets(JavaPairDStream kafkastream){
kafkastream.foreachRDD((rdd,batchMilliSec) -> {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
return null;
});


Thanks !!
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-15 Thread Abhishek Anand
I am now trying to use mapWithState in the following way using some example
codes. But, by looking at the DAG it does not seem to checkpoint the state
and when restarting the application from checkpoint, it re-partitions all
the previous batches data from kafka.

static Function3, State, Tuple2> mappingFunc =
new Function3, State, Tuple2>() {
@Override
public Tuple2 call(String key, Optional one,
State state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


Please suggest if this is the proper way or am I doing something wrong.


Thanks !!
Abhi

On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu 
wrote:

> If you don't want to update your only option will be updateStateByKey then
> On 13 Feb 2016 8:48 p.m., "Ted Yu"  wrote:
>
>> mapWithState supports checkpoint.
>>
>> There has been some bug fix since release of 1.6.0
>> e.g.
>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>> KryoSerializer
>>
>> which is in the upcoming 1.6.1
>>
>> Cheers
>>
>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand > > wrote:
>>
>>> Does mapWithState checkpoints the data ?
>>>
>>> When my application goes down and is restarted from checkpoint, will
>>> mapWithState need to recompute the previous batches data ?
>>>
>>> Also, to use mapWithState I will need to upgrade my application as I am
>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>> other work around ?
>>>
>>> Cheers!!
>>> Abhi
>>>
>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu 
>>> wrote:
>>>
>>>> Looks like mapWithState could help you?
>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have an use case like follows in my production environment where I
>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>> hours.
>>>>>
>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>> but with different value,which might appear in the same batch or some next
>>>>> batch.
>>>>>
>>>>> When the key appears second time I need to update a field in value of
>>>>> previous key with a field in the later key. The keys for which the
>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>
>>>>> At the end of each second I need to output the result to external
>>>>> database.
>>>>>
>>>>> For example :
>>>>>
>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>> At t=1sec I am getting
>>>>> key0,value0(0,"prev0")
>>>>> key1,value1 (1, "prev1")
>>>>> key2,value2 (2,"prev2")
>>>>> key2,value3 (3, "next2")
>>>>>
>>>>> Output to database after 1 sec
>>>>> key2, newValue (2,"next2")
>>>>>
>>>>> At t=2 sec getting
>>>>> key3,value4(4,"prev3")
>>>>> key1,value5(5,"next1")
>>>>>
>>>>> Output to database after 2 sec
>>>>> key1,newValue(1,"next1")
>>>>>
>>>>> At t=3 sec
>>>>> key4,value6(6,"prev4")
>>>>> key3,value7(7,"next3")
>>>>> key5,value5(8,"prev5")
>>>>> key5,value5(9,"next5")
>>>>> key0,value0(10,"next0")
>>>>>
>>>>> Output to database after 3 sec
>>>>> key0,newValue(0,"next0")
>>>>> key3,newValue(4,"next3")
>>>>> key5,newValue(8,"next5")
>>>>>
>>>>>
>>>>> Please suggest how this can be achieved.
>>>>>
>>>>>
>>>>> Thanks a lot 
>>>>> Abhi
>>>>>
>>>>>
>>>>>
>>>
>>


Re: Worker's BlockManager Folder not getting cleared

2016-02-13 Thread Abhishek Anand
Hi All,

Any ideas on this one ?

The size of this directory keeps on growing.

I can see there are many files from a day earlier too.

Cheers !!
Abhi

On Tue, Jan 26, 2016 at 7:13 PM, Abhishek Anand 
wrote:

> Hi Adrian,
>
> I am running spark in standalone mode.
>
> The spark version that I am using is 1.4.0
>
> Thanks,
> Abhi
>
> On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett 
> wrote:
>
>> Hi Abhi - are you running on Mesos perchance?
>>
>> If so then with spark <1.6 you will be hitting
>> https://issues.apache.org/jira/browse/SPARK-10975
>> With spark >= 1.6:
>> https://issues.apache.org/jira/browse/SPARK-12430
>> and also be aware of:
>> https://issues.apache.org/jira/browse/SPARK-12583
>>
>>
>> On 25/01/2016 07:14, Abhishek Anand wrote:
>>
>> Hi All,
>>
>> How long the shuffle files and data files are stored on the block manager
>> folder of the workers.
>>
>> I have a spark streaming job with window duration of 2 hours and slide
>> interval of 15 minutes.
>>
>> When I execute the following command in my block manager path
>>
>> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>>
>> I see a lot of files which means that they are not getting cleared which
>> I was expecting that they should get cleared.
>>
>> Subsequently, this size keeps on increasing and takes space on the disk.
>>
>> Please suggest how to get rid of this and help on understanding this
>> behaviour.
>>
>>
>>
>> Thanks !!!
>> Abhi
>>
>>
>> --
>> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
>> <http://www.opensignal.com>
>> _
>> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
>> Phone #: +44 777-377-8251
>> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>
>>   |  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
>> _
>>
>
>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-13 Thread Abhishek Anand
Does mapWithState checkpoints the data ?

When my application goes down and is restarted from checkpoint, will
mapWithState need to recompute the previous batches data ?

Also, to use mapWithState I will need to upgrade my application as I am
using version 1.4.0 and mapWithState isnt supported there. Is there any
other work around ?

Cheers!!
Abhi

On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu 
wrote:

> Looks like mapWithState could help you?
> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" 
> wrote:
>
>> Hi All,
>>
>> I have an use case like follows in my production environment where I am
>> listening from kafka with slideInterval of 1 min and windowLength of 2
>> hours.
>>
>> I have a JavaPairDStream where for each key I am getting the same key but
>> with different value,which might appear in the same batch or some next
>> batch.
>>
>> When the key appears second time I need to update a field in value of
>> previous key with a field in the later key. The keys for which the
>> combination keys do not come should be rejected after 2 hours.
>>
>> At the end of each second I need to output the result to external
>> database.
>>
>> For example :
>>
>> Suppose valueX is object of MyClass with fields int a, String b
>> At t=1sec I am getting
>> key0,value0(0,"prev0")
>> key1,value1 (1, "prev1")
>> key2,value2 (2,"prev2")
>> key2,value3 (3, "next2")
>>
>> Output to database after 1 sec
>> key2, newValue (2,"next2")
>>
>> At t=2 sec getting
>> key3,value4(4,"prev3")
>> key1,value5(5,"next1")
>>
>> Output to database after 2 sec
>> key1,newValue(1,"next1")
>>
>> At t=3 sec
>> key4,value6(6,"prev4")
>> key3,value7(7,"next3")
>> key5,value5(8,"prev5")
>> key5,value5(9,"next5")
>> key0,value0(10,"next0")
>>
>> Output to database after 3 sec
>> key0,newValue(0,"next0")
>> key3,newValue(4,"next3")
>> key5,newValue(8,"next5")
>>
>>
>> Please suggest how this can be achieved.
>>
>>
>> Thanks a lot 
>> Abhi
>>
>>
>>


Stateful Operation on JavaPairDStream Help Needed !!

2016-02-11 Thread Abhishek Anand
Hi All,

I have an use case like follows in my production environment where I am
listening from kafka with slideInterval of 1 min and windowLength of 2
hours.

I have a JavaPairDStream where for each key I am getting the same key but
with different value,which might appear in the same batch or some next
batch.

When the key appears second time I need to update a field in value of
previous key with a field in the later key. The keys for which the
combination keys do not come should be rejected after 2 hours.

At the end of each second I need to output the result to external database.

For example :

Suppose valueX is object of MyClass with fields int a, String b
At t=1sec I am getting
key0,value0(0,"prev0")
key1,value1 (1, "prev1")
key2,value2 (2,"prev2")
key2,value3 (3, "next2")

Output to database after 1 sec
key2, newValue (2,"next2")

At t=2 sec getting
key3,value4(4,"prev3")
key1,value5(5,"next1")

Output to database after 2 sec
key1,newValue(1,"next1")

At t=3 sec
key4,value6(6,"prev4")
key3,value7(7,"next3")
key5,value5(8,"prev5")
key5,value5(9,"next5")
key0,value0(10,"next0")

Output to database after 3 sec
key0,newValue(0,"next0")
key3,newValue(4,"next3")
key5,newValue(8,"next5")


Please suggest how this can be achieved.


Thanks a lot 
Abhi


Re: Repartition taking place for all previous windows even after checkpointing

2016-02-01 Thread Abhishek Anand
Any insights on this ?


On Fri, Jan 29, 2016 at 1:08 PM, Abhishek Anand 
wrote:

> Hi All,
>
> Can someone help me with the following doubts regarding checkpointing :
>
> My code flow is something like follows ->
>
> 1) create direct stream from kafka
> 2) repartition kafka stream
> 3)  mapToPair followed by reduceByKey
> 4)  filter
> 5)  reduceByKeyAndWindow without the inverse function
> 6)  write to cassandra
>
> Now when I restart my application from checkpoint, I see repartition and
> other steps being called for the previous windows which takes longer and
> delays my aggregations.
>
> My understanding  was that once data checkpointing is done it should not
> re-read from kafka and use the saved RDDs but guess I am wrong.
>
> Is there a way to avoid the repartition or any workaround for this.
>
> Spark Version is 1.4.0
>
> Cheers !!
> Abhi
>


Repartition taking place for all previous windows even after checkpointing

2016-01-28 Thread Abhishek Anand
Hi All,

Can someone help me with the following doubts regarding checkpointing :

My code flow is something like follows ->

1) create direct stream from kafka
2) repartition kafka stream
3)  mapToPair followed by reduceByKey
4)  filter
5)  reduceByKeyAndWindow without the inverse function
6)  write to cassandra

Now when I restart my application from checkpoint, I see repartition and
other steps being called for the previous windows which takes longer and
delays my aggregations.

My understanding  was that once data checkpointing is done it should not
re-read from kafka and use the saved RDDs but guess I am wrong.

Is there a way to avoid the repartition or any workaround for this.

Spark Version is 1.4.0

Cheers !!
Abhi


Re: Worker's BlockManager Folder not getting cleared

2016-01-26 Thread Abhishek Anand
Hi Adrian,

I am running spark in standalone mode.

The spark version that I am using is 1.4.0

Thanks,
Abhi

On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett 
wrote:

> Hi Abhi - are you running on Mesos perchance?
>
> If so then with spark <1.6 you will be hitting
> https://issues.apache.org/jira/browse/SPARK-10975
> With spark >= 1.6:
> https://issues.apache.org/jira/browse/SPARK-12430
> and also be aware of:
> https://issues.apache.org/jira/browse/SPARK-12583
>
>
> On 25/01/2016 07:14, Abhishek Anand wrote:
>
> Hi All,
>
> How long the shuffle files and data files are stored on the block manager
> folder of the workers.
>
> I have a spark streaming job with window duration of 2 hours and slide
> interval of 15 minutes.
>
> When I execute the following command in my block manager path
>
> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>
> I see a lot of files which means that they are not getting cleared which I
> was expecting that they should get cleared.
>
> Subsequently, this size keeps on increasing and takes space on the disk.
>
> Please suggest how to get rid of this and help on understanding this
> behaviour.
>
>
>
> Thanks !!!
> Abhi
>
>
> --
> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
> <http://www.opensignal.com>
> _
> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
> Phone #: +44 777-377-8251
> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>  |
>  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
> _
>


Worker's BlockManager Folder not getting cleared

2016-01-24 Thread Abhishek Anand
Hi All,

How long the shuffle files and data files are stored on the block manager
folder of the workers.

I have a spark streaming job with window duration of 2 hours and slide
interval of 15 minutes.

When I execute the following command in my block manager path

find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;

I see a lot of files which means that they are not getting cleared which I
was expecting that they should get cleared.

Subsequently, this size keeps on increasing and takes space on the disk.

Please suggest how to get rid of this and help on understanding this
behaviour.



Thanks !!!
Abhi


Getting kafka offsets at beginning of spark streaming application

2016-01-11 Thread Abhishek Anand
Hi,

Is there a way so that I can fetch the offsets from where the spark
streaming starts reading from Kafka when my application starts ?

What I am trying is to create an initial RDD with offsest at a particular
time passed as input from the command line and the offsets from where my
spark streaming starts.

Eg -

Partition 0 -> 1000 to (offset at which my spark streaming starts)

Thanks !!


Error on using updateStateByKey

2015-12-18 Thread Abhishek Anand
I am trying to use updateStateByKey but receiving the following error.
(Spark Version 1.4.0)

Can someone please point out what might be the possible reason for this
error.


*The method
updateStateByKey(Function2,Optional,Optional>)
in the type JavaPairDStream is not applicable
for the arguments *
* 
(Function2,Optional,Optional>)*


This is the update function that I am using inside updateStateByKey.

I am applying updateStateByKey on a tuple of 

private static Function2,
Optional, Optional> updateFunction =
new Function2, Optional,
Optional>() {
/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Optional call(List values,
Optional current) {
AggregationMetrics newSum = current.or(new AggregationMetrics(0L, 0L, 0L));
for(int i=0; i < values.size(); i++)
{
//set with new values
}
return Optional.of(newSum);
}
};



Thanks,
Abhi


Re: Unable to use "Batch Start Time" on worker nodes.

2015-11-30 Thread Abhishek Anand
Thanks TD !!

I think this should solve my purpose.




On Sun, Nov 29, 2015 at 6:17 PM, Tathagata Das  wrote:

> You can get the batch start (the expected, not the exact time when the
> jobs are submitted) from DStream operation "transform". There is a version
> of transform that allows you specify a function with two params - the
> parent RDD and the batch time at which the RDD was generated.
>
> TD
>
> On Thu, Nov 26, 2015 at 1:33 PM, Abhishek Anand 
> wrote:
>
>> Hi ,
>>
>> I need to use batch start time in my spark streaming job.
>>
>> I need the value of batch start time inside one of the functions that is
>> called within a flatmap function in java.
>>
>> Please suggest me how this can be done.
>>
>> I tried to use the StreamingListener class and set the value of a
>> variable inside the onBatchSubmitted function something like this :
>>
>> public void onBatchSubmitted(StreamingListenerBatchSubmitted
>> batchSubmitted) { batchstarttime =
>> batchSubmitted.batchInfo().batchTime().milliseconds();
>>   CommandLineArguments.BATCH_START_TIME = batchstarttime;
>>  }
>>
>>
>> But, the issue is that the BATCH_START_TIME set only when the batch
>> starts. I see in the worker logs that BATCH_START_TIME takes the default
>> value and is not set.
>>
>>
>> Please suggest how this can be achieved.
>>
>>
>>
>> BR,
>> Abhi
>>
>
>


Unable to use "Batch Start Time" on worker nodes.

2015-11-26 Thread Abhishek Anand
Hi ,

I need to use batch start time in my spark streaming job.

I need the value of batch start time inside one of the functions that is
called within a flatmap function in java.

Please suggest me how this can be done.

I tried to use the StreamingListener class and set the value of a variable
inside the onBatchSubmitted function something like this :

public void onBatchSubmitted(StreamingListenerBatchSubmitted
batchSubmitted) { batchstarttime =
batchSubmitted.batchInfo().batchTime().milliseconds();
  CommandLineArguments.BATCH_START_TIME = batchstarttime;
 }


But, the issue is that the BATCH_START_TIME set only when the batch starts.
I see in the worker logs that BATCH_START_TIME takes the default value and
is not set.


Please suggest how this can be achieved.



BR,
Abhi


Getting the batch time of the active batches in spark streaming

2015-11-24 Thread Abhishek Anand
Hi ,

I need to get the batch time of the active batches which appears on the UI
of spark streaming tab,

How can this be achieved in Java ?

BR,
Abhi


External Table not getting updated from parquet files written by spark streaming

2015-11-19 Thread Abhishek Anand
Hi ,

I am using spark streaming to write the aggregated output as parquet files
to the hdfs using SaveMode.Append. I have an external table created like :


CREATE TABLE if not exists rolluptable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "hdfs:"
);

I had an impression that in case of external table the queries should fetch
the data from newly parquet added files also. But, seems like the newly
written files are not being picked up.

Dropping and recreating the table every time works fine but not a solution.


Please suggest how can my table have the data from newer files also.



Thanks !!
Abhi