Re: Live Streamed Code Review today at 11am Pacific

2018-03-09 Thread Holden Karau
If anyone wants to watch the recording:
https://www.youtube.com/watch?v=lugG_2QU6YU

I'll do one next week as well - March 16th @ 11am -
https://www.youtube.com/watch?v=pXzVtEUjrLc

On Fri, Mar 9, 2018 at 9:28 AM, Holden Karau  wrote:

> Hi folks,
>
> If your curious in learning more about how Spark is developed, I’m going
> to expirement doing a live code review where folks can watch and see how
> that part of our process works. I have two volunteers already for having
> their PRs looked at live, and if you have a Spark PR your working on you’d
> like me to livestream a review of please ping me.
>
> The livestream will be at https://www.youtube.com/watch?v=lugG_2QU6YU.
>
> Cheers,
>
> Holden :)
> --
> Twitter: https://twitter.com/holdenkarau
>



-- 
Twitter: https://twitter.com/holdenkarau


Re: Upgrades of streaming jobs

2018-03-09 Thread Tathagata Das
Yes, all checkpoints are forward compatible.

However, you do need to restart the query if you want to update the code of
the query. This downtime can be in less than a second (if you just restart
the query without stopping the application/Spark driver) or 10s of seconds
(if you have to stop the application and resubmit your application to the
cluster).



On Thu, Mar 8, 2018 at 12:11 PM, Georg Heiler 
wrote:

> Hi
>
> What is the state of spark structured streaming jobs and upgrades?
>
> Can checkpoints of version 1 be read by version 2 of a job? Is downtime
> required to upgrade the job?
>
> Thanks
>


Live Streamed Code Review today at 11am Pacific

2018-03-09 Thread Holden Karau
Hi folks,

If your curious in learning more about how Spark is developed, I’m going to
expirement doing a live code review where folks can watch and see how that
part of our process works. I have two volunteers already for having their
PRs looked at live, and if you have a Spark PR your working on you’d like
me to livestream a review of please ping me.

The livestream will be at https://www.youtube.com/watch?v=lugG_2QU6YU.

Cheers,

Holden :)
-- 
Twitter: https://twitter.com/holdenkarau


Issue with using Generalized Linear Regression for Logistic Regression modeling

2018-03-09 Thread FireFly
The Logistic Regression (LR) offered by Spark has rather limited model
statistics output. I would like to have access to q-value, AIC, standard
error etc. Generalized Linear Regression (GLR) does offer these statistics
in the model output, and can be used as as LR if one specifies
family="binomial", link="logit" in the GLR. The issue I ran into is that
some models converge nicely using Logistic Regression, but not using
Generalized Linear Regression. For other models, I do see they converge to
the same result using either LR or GLR.

I played around with the solver options in GLR, it didn't help. The option
that does make a difference is the weightCol. Without it, both LR and GLR
converge to the same thing, making sense of not aside. With the weightCol
included, LR converge, in about 10 iterations, to the same result as what we
got using SAS; GLR just won't converge (I tried 1 iterations) and the
model coefficients at the end of the run, where the maximum number of
iteration was hit, are in the 10^12 range, which are way off.

I am using Spark 2.2.0 currently. The relevant part of the code is pasted
below.

trainingData =
sqlContext.read.load(args.input_df_name).repartition(args.repartition)

   
catCol=['rwdproduct2','state_final','mixed','ocup','SECURED','o_channel','season','sa_C_ten_buck','sa_C_fico_buck','sa_C_otb_buck']
numCol=['PRIME_ma_6L36']

colNameModStr="_class"
catColClass=[colName + colNameModStr for colName in catCol]

stages = []
for col in catCol:
stringIndexer = StringIndexer(inputCol=col, outputCol=col+"Index")
encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
outputCol=col+colNameModStr)
stages += [stringIndexer, encoder]

assembler = VectorAssembler(inputCols=catColClass + numCol,
outputCol='features')

glr=GeneralizedLinearRegression(family="binomial", link="logit",
solver="SGD", weightCol = "wt", labelCol="bad", maxIter=20, tol=1.0E-12,
regParam=0)

pipeline = Pipeline(stages=stages + [assembler, glr])

modelDF = pipeline.fit(trainingData)

# --- Output some modeling results
print("Model Betas is
{}".format(modelDF.stages.__getitem__(-1).coefficients))

Appreciate any help you would offer to resolve this.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
But overall, I think the original approach is not correct.
If you get a single file in 10s GB, the approach is probably must be
reworked.

I don't see why you can't just write multiple CSV files using Spark, and
then concatenate them without Spark

On Fri, Mar 9, 2018 at 10:02 AM, Vadim Semenov  wrote:

> You can use `.checkpoint` for that
>
> `df.sort(…).coalesce(1).write...` — `coalesce` will make `sort` to have
> only one partition, so sorting will take a lot of time
>
> `df.sort(…).repartition(1).write...` — `repartition` will add an explicit
> stage, but sorting will be lost, since it's a repartition
>
> ```
> sc.setCheckpointDir("/tmp/test")
> val checkpointedDf = df.sort(…).checkpoint(eager=true) // will save all
> partitions
> checkpointedDf.coalesce(1).write.csv(…) // will load checkpointed
> partitions in one task, concatenate them, and will write them out as a
> single file
> ```
>
> On Fri, Mar 9, 2018 at 9:47 AM, Deepak Sharma 
> wrote:
>
>> I would suggest repartioning it to reasonable partitions  may ne 500 and
>> save it to some intermediate working directory .
>> Finally read all the files from this working dir and then coalesce as 1
>> and save to final location.
>>
>> Thanks
>> Deepak
>>
>> On Fri, Mar 9, 2018, 20:12 Vadim Semenov  wrote:
>>
>>> because `coalesce` gets propagated further up in the DAG in the last
>>> stage, so your last stage only has one task.
>>>
>>> You need to break your DAG so your expensive operations would be in a
>>> previous stage before the stage with `.coalesce(1)`
>>>
>>> On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
 Dear All,

 I have a tiny CSV file, which is around 250MB. There are only 30
 columns in the DataFrame. Now I'm trying to save the pre-processed
 DataFrame as an another CSV file on disk for later usage.

 However, I'm getting pissed off as writing the resultant DataFrame is
 taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
 file written on the disk is about 58GB!

 Here's the sample code that I tried:

 # Using repartition()
 myDF.repartition(1).write.format("com.databricks.spark.csv")
 .save("data/file.csv")

 # Using coalesce()
 myDF. coalesce(1).write.format("com.databricks.spark.csv").save("d
 ata/file.csv")


 Any better suggestion?



 
 Md. Rezaul Karim, BSc, MSc
 Research Scientist, Fraunhofer FIT, Germany

 Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

 eMail: rezaul.ka...@fit.fraunhofer.de
 
 Tel: +49 241 80-21527 <+49%20241%208021527>

>>>
>>>
>>>
>>> --
>>> Sent from my iPhone
>>>
>>
>
>
> --
> Sent from my iPhone
>



-- 
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Silvio Fiorito
Given you start with ~250MB but end up with 58GB seems like you’re generating 
quite a bit of data.

Whether you use coalesce or repartition, still writing out 58GB with one core 
is going to take a while.

Using Spark to do pre-processing but output a single file is not going to be 
very efficient since you’re asking Spark to limit its parallelization even if 
just the final stage to write data out.

What are you using downstream to read this file and why does it need to be a 
single 58GB file? Could you simply keep it in Spark to keep the pipeline 
optimized and avoid the data persistence step? For example, if you’re using R 
or Python to do some downstream processing you could just make that part of 
your pipeline vs writing it out and then reading it back in from another system.


From: Vadim Semenov 
Date: Friday, March 9, 2018 at 9:42 AM
To: "Md. Rezaul Karim" 
Cc: spark users 
Subject: Re: Writing a DataFrame is taking too long and huge space

because `coalesce` gets propagated further up in the DAG in the last stage, so 
your last stage only has one task.

You need to break your DAG so your expensive operations would be in a previous 
stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim 
mailto:rezaul.ka...@insight-centre.org>> wrote:
Dear All,
I have a tiny CSV file, which is around 250MB. There are only 30 columns in the 
DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV 
file on disk for later usage.
However, I'm getting pissed off as writing the resultant DataFrame is taking 
too long, which is about 4 to 5 hours. Nevertheless, the size of the file 
written on the disk is about 58GB!

Here's the sample code that I tried:
# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")

Any better suggestion?





Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: rezaul.ka...@fit.fraunhofer.de
Tel: +49 241 80-21527



--
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
You can use `.checkpoint` for that

`df.sort(…).coalesce(1).write...` — `coalesce` will make `sort` to have
only one partition, so sorting will take a lot of time

`df.sort(…).repartition(1).write...` — `repartition` will add an explicit
stage, but sorting will be lost, since it's a repartition

```
sc.setCheckpointDir("/tmp/test")
val checkpointedDf = df.sort(…).checkpoint(eager=true) // will save all
partitions
checkpointedDf.coalesce(1).write.csv(…) // will load checkpointed
partitions in one task, concatenate them, and will write them out as a
single file
```

On Fri, Mar 9, 2018 at 9:47 AM, Deepak Sharma  wrote:

> I would suggest repartioning it to reasonable partitions  may ne 500 and
> save it to some intermediate working directory .
> Finally read all the files from this working dir and then coalesce as 1
> and save to final location.
>
> Thanks
> Deepak
>
> On Fri, Mar 9, 2018, 20:12 Vadim Semenov  wrote:
>
>> because `coalesce` gets propagated further up in the DAG in the last
>> stage, so your last stage only has one task.
>>
>> You need to break your DAG so your expensive operations would be in a
>> previous stage before the stage with `.coalesce(1)`
>>
>> On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Dear All,
>>>
>>> I have a tiny CSV file, which is around 250MB. There are only 30 columns
>>> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
>>> another CSV file on disk for later usage.
>>>
>>> However, I'm getting pissed off as writing the resultant DataFrame is
>>> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
>>> file written on the disk is about 58GB!
>>>
>>> Here's the sample code that I tried:
>>>
>>> # Using repartition()
>>> myDF.repartition(1).write.format("com.databricks.spark.
>>> csv").save("data/file.csv")
>>>
>>> # Using coalesce()
>>> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("
>>> data/file.csv")
>>>
>>>
>>> Any better suggestion?
>>>
>>>
>>>
>>> 
>>> Md. Rezaul Karim, BSc, MSc
>>> Research Scientist, Fraunhofer FIT, Germany
>>>
>>> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>>>
>>> eMail: rezaul.ka...@fit.fraunhofer.de
>>> 
>>> Tel: +49 241 80-21527 <+49%20241%208021527>
>>>
>>
>>
>>
>> --
>> Sent from my iPhone
>>
>


-- 
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Deepak Sharma
I would suggest repartioning it to reasonable partitions  may ne 500 and
save it to some intermediate working directory .
Finally read all the files from this working dir and then coalesce as 1 and
save to final location.

Thanks
Deepak

On Fri, Mar 9, 2018, 20:12 Vadim Semenov  wrote:

> because `coalesce` gets propagated further up in the DAG in the last
> stage, so your last stage only has one task.
>
> You need to break your DAG so your expensive operations would be in a
> previous stage before the stage with `.coalesce(1)`
>
> On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Dear All,
>>
>> I have a tiny CSV file, which is around 250MB. There are only 30 columns
>> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
>> another CSV file on disk for later usage.
>>
>> However, I'm getting pissed off as writing the resultant DataFrame is
>> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
>> file written on the disk is about 58GB!
>>
>> Here's the sample code that I tried:
>>
>> # Using repartition()
>>
>> myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")
>>
>> # Using coalesce()
>> myDF.
>> coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")
>>
>>
>> Any better suggestion?
>>
>>
>>
>> 
>> Md. Rezaul Karim, BSc, MSc
>> Research Scientist, Fraunhofer FIT, Germany
>>
>> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>>
>> eMail: rezaul.ka...@fit.fraunhofer.de 
>> Tel: +49 241 80-21527 <+49%20241%208021527>
>>
>
>
>
> --
> Sent from my iPhone
>


Contextual bandits

2018-03-09 Thread ey-chih chow
Hi,

Does Spark MLLIB support Contextual Bandit?  How can we use Spark MLLIB to
implement Contextual Bandit?

Thanks.

Best regards,

Ey-Chih



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
because `coalesce` gets propagated further up in the DAG in the last stage,
so your last stage only has one task.

You need to break your DAG so your expensive operations would be in a
previous stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Dear All,
>
> I have a tiny CSV file, which is around 250MB. There are only 30 columns
> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
> another CSV file on disk for later usage.
>
> However, I'm getting pissed off as writing the resultant DataFrame is
> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
> file written on the disk is about 58GB!
>
> Here's the sample code that I tried:
>
> # Using repartition()
> myDF.repartition(1).write.format("com.databricks.spark.
> csv").save("data/file.csv")
>
> # Using coalesce()
> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("
> data/file.csv")
>
>
> Any better suggestion?
>
>
>
> 
> Md. Rezaul Karim, BSc, MSc
> Research Scientist, Fraunhofer FIT, Germany
>
> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>
> eMail: rezaul.ka...@fit.fraunhofer.de 
> Tel: +49 241 80-21527 <+49%20241%208021527>
>



-- 
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Md. Rezaul Karim
Hi All,

Thanks for prompt response. Really appreciated! Here's a few more info:

1. Spark version: 2.3.0
2. vCore: 8
3. RAM: 32GB
4. Deploy mode: Spark standalone

*Operation performed:* I did transformations using StringIndexer on some
columns and null imputations. That's all.

*Why writing back into CSV:* I need to write the dataframe into CSV to be
used by a non-Spark application. Nevertheless, I need to perform
pre-processing on a larger-dataset (about 2GB) and this one is just a
simple. So writing into parquet or ORC is not a viable option for me.

I was trying to use Spark for only pre-processing. By the way, I tried
using Spark builtin CSV library too.




Best,



Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: rezaul.ka...@fit.fraunhofer.de 
Tel: +49 241 80-21527

On 9 March 2018 at 13:41, Teemu Heikkilä  wrote:

> Sounds like you’re doing something else than just writing the same file
> back to disk, what your preprocessing consists?
>
> Sometimes you can save lot’s of space by using other formats but now we’re
> talking over 200x increase in file size so depending on the transformations
> for the data you might not get so huge savings by using some other format.
>
> If you can give more details about what you are doing with the data we
> could probably help with your task.
>
> Slowness probably happens because Spark is using disk to process the data
> into single partition for writing the single file, one thing to reconsider
> is that if you can merge the product files after the process or even
> pre-partition it for it’s final use case.
>
> - Teemu
>
> On 9.3.2018, at 12.23, Md. Rezaul Karim 
> wrote:
>
> Dear All,
>
> I have a tiny CSV file, which is around 250MB. There are only 30 columns
> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
> another CSV file on disk for later usage.
>
> However, I'm getting pissed off as writing the resultant DataFrame is
> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
> file written on the disk is about 58GB!
>
> Here's the sample code that I tried:
>
> # Using repartition()
> myDF.repartition(1).write.format("com.databricks.spark.
> csv").save("data/file.csv")
>
> # Using coalesce()
> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("
> data/file.csv")
>
>
> Any better suggestion?
>
>
>
> 
> Md. Rezaul Karim, BSc, MSc
> Research Scientist, Fraunhofer FIT, Germany
> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
> eMail: rezaul.ka...@fit.fraunhofer.de 
> Tel: +49 241 80-21527 <+49%20241%208021527>
>
>
>


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Teemu Heikkilä
Sounds like you’re doing something else than just writing the same file back to 
disk, what your preprocessing consists?

Sometimes you can save lot’s of space by using other formats but now we’re 
talking over 200x increase in file size so depending on the transformations for 
the data you might not get so huge savings by using some other format.

If you can give more details about what you are doing with the data we could 
probably help with your task.

Slowness probably happens because Spark is using disk to process the data into 
single partition for writing the single file, one thing to reconsider is that 
if you can merge the product files after the process or even pre-partition it 
for it’s final use case.

- Teemu

> On 9.3.2018, at 12.23, Md. Rezaul Karim  
> wrote:
> 
> Dear All,
> 
> I have a tiny CSV file, which is around 250MB. There are only 30 columns in 
> the DataFrame. Now I'm trying to save the pre-processed DataFrame as an 
> another CSV file on disk for later usage. 
> 
> However, I'm getting pissed off as writing the resultant DataFrame is taking 
> too long, which is about 4 to 5 hours. Nevertheless, the size of the file 
> written on the disk is about 58GB!  
> 
> Here's the sample code that I tried:
> 
> # Using repartition()
> myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")
> 
> # Using coalesce()
> myDF. 
> coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")
> 
> 
> Any better suggestion? 
> 
> 
> 
>  
> Md. Rezaul Karim, BSc, MSc
> Research Scientist, Fraunhofer FIT, Germany
> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
> eMail: rezaul.ka...@fit.fraunhofer.de 
> 
> Tel: +49 241 80-21527



Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Gourav Dutta
Which version of spark are you using?

The reason for asking this question is from Spark 2.x csv is internal
library so no need to save it with com.databricks.spark.csv package.

Moreover, taking time for this simple task is very much dependent upon your
cluster health. Could you please provide the details of the following
things (if you are using YARN)?

1. Number of nodes
2. VCores
3. Node Memory

an

Thanks,
Gourav Dutta

On Mar 9, 2018 5:35 PM, "Matteo Durighetto"  wrote:

Hello, try to use parquet format with compression ( like snappy or lz4 ) so
the produced files will be smaller and it will generate less i/o. Moreover
normally parquet is more faster than csv format in reading for further
operations .
Another possible format is ORC file.

Kind Regards

Matteo


2018-03-09 11:23 GMT+01:00 Md. Rezaul Karim :

> Dear All,
>
> I have a tiny CSV file, which is around 250MB. There are only 30 columns
> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
> another CSV file on disk for later usage.
>
> However, I'm getting pissed off as writing the resultant DataFrame is
> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
> file written on the disk is about 58GB!
>
> Here's the sample code that I tried:
>
> # Using repartition()
> myDF.repartition(1).write.format("com.databricks.spark.csv")
> .save("data/file.csv")
>
> # Using coalesce()
> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("d
> ata/file.csv")
>
>
> Any better suggestion?
>
>
>
> 
> Md. Rezaul Karim, BSc, MSc
> Research Scientist, Fraunhofer FIT, Germany
>
> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>
> eMail: rezaul.ka...@fit.fraunhofer.de 
> Tel: +49 241 80-21527 <+49%20241%208021527>
>


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Matteo Durighetto
Hello, try to use parquet format with compression ( like snappy or lz4 ) so
the produced files will be smaller and it will generate less i/o. Moreover
normally parquet is more faster than csv format in reading for further
operations .
Another possible format is ORC file.

Kind Regards

Matteo


2018-03-09 11:23 GMT+01:00 Md. Rezaul Karim :

> Dear All,
>
> I have a tiny CSV file, which is around 250MB. There are only 30 columns
> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
> another CSV file on disk for later usage.
>
> However, I'm getting pissed off as writing the resultant DataFrame is
> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
> file written on the disk is about 58GB!
>
> Here's the sample code that I tried:
>
> # Using repartition()
> myDF.repartition(1).write.format("com.databricks.spark.
> csv").save("data/file.csv")
>
> # Using coalesce()
> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("
> data/file.csv")
>
>
> Any better suggestion?
>
>
>
> 
> Md. Rezaul Karim, BSc, MSc
> Research Scientist, Fraunhofer FIT, Germany
>
> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>
> eMail: rezaul.ka...@fit.fraunhofer.de 
> Tel: +49 241 80-21527 <+49%20241%208021527>
>


Connection SparkStreaming with SchemaRegistry

2018-03-09 Thread Guillermo Ortiz
 I'm trying to integrate with schemaRegistry and SparkStreaming. By the
moment I want to use GenericRecords. It seems that my producer works and
new schemas are published in _schemas topic. When I try to read with my
Consumer, I'm not able to deserialize the data.

How could I say to Spark that I'm going to deserializer to GenericRecord?



public class SparkStreamingSchemaRegister {

public static void main(String[] args) throws InterruptedException {
String topic = "avro_example_schemaRegistry";

final JavaStreamingContext jssc = new
JavaStreamingContext(getSparkConf(),

Durations.milliseconds(Constants.STREAM_BATCH_DURATION_MILLIS));


final JavaInputDStream>
rawStream = KafkaSource.getKafkaDirectStream(jssc);

rawStream.foreachRDD(rdd -> {
JavaRDD javaRddClient = rdd.map(
kafkaRecord -> {

GenericRecord record = kafkaRecord.value(); -->
ERROR
return CrmClient.getCrmClient(kafkaRecord.value());
});


   CassandraJavaUtil
.javaFunctions(javaRddClient)
.writerBuilder("keyspace", "client",
CassandraJavaUtil.mapToRow(CrmClient.class))
.withColumnSelector(CassandraJavaUtil.someColumns("id",
"name", "lastname"))
.saveToCassandra();
});


jssc.start();
jssc.awaitTermination();
jssc.close();
}


private static class KafkaSource {
public static JavaInputDStream> getKafkaDirectStream(JavaStreamingContext jssc) {
JavaInputDStream>
stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(getKafkaTopic(),
getKafkaConf()));
return stream;
}


private static Map getKafkaConf() {
Map kafkaParams = new HashMap<>();

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_BOOTSTRAP_SERVERS.getValue(),
Constants.KAFKA_BOOTSTRAP_SERVERS);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_KEY_DESERIALIZER.getValue(),
ByteArrayDeserializer.class);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_GROUPID.getValue(),
Constants.KAFKA_GROUP_ID);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_ENABLE_AUTO_COMMIT.getValue(),
false);

kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_AUTO_OFFSET_RESET.getValue(),
"earliest");

* 
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class.getName());*

kafkaParams.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
"false");

kafkaParams.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"
http://localhost:8081";);

return kafkaParams;
}

}
}


Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Md. Rezaul Karim
Dear All,

I have a tiny CSV file, which is around 250MB. There are only 30 columns in
the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
another CSV file on disk for later usage.

However, I'm getting pissed off as writing the resultant DataFrame is
taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
file written on the disk is about 58GB!

Here's the sample code that I tried:

# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF.
coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")


Any better suggestion?




Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: rezaul.ka...@fit.fraunhofer.de 
Tel: +49 241 80-21527