Re: Replicating a row n times

2017-09-29 Thread Kanagha Kumar
Thanks for the response.
I can use either row_number() or monotonicallyIncreasingId to generate
uniqueIds as in
https://hadoopist.wordpress.com/2016/05/24/generate-unique-ids-for-each-rows-in-a-spark-dataframe/

I'm looking for a java example to use that to replicate a single row n
times by appending a rownum column generated as above or using explode
function.

Ex:

ds.withColumn("ROWNUM", org.apache.spark.sql.functions.explode(columnEx));

columnEx needs to be of type array inorder for explode to work.

Any suggestions are helpful.
Thanks


On Thu, Sep 28, 2017 at 7:21 PM, ayan guha  wrote:

> How about using row number for primary key?
>
> Select row_number() over (), * from table
>
> On Fri, 29 Sep 2017 at 10:21 am, Kanagha Kumar 
> wrote:
>
>> Hi,
>>
>> I'm trying to replicate a single row from a dataset n times and create a
>> new dataset from it. But, while replicating I need a column's value to be
>> changed for each replication since it would be end up as the primary key
>> when stored finally.
>>
>> Looked at the following reference:https://stackoverflow.com/questions/
>> 40397740/replicate-spark-row-n-times
>>
>> import org.apache.spark.sql.functions._
>> val result = singleRowDF
>>   .withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
>>   .selectExpr(singleRowDF.columns: _*)
>>
>> How can I create a column from an array of values in Java and pass it to
>> explode function? Suggestions are helpful.
>>
>>
>> Thanks
>> Kanagha
>>
> --
> Best Regards,
> Ayan Guha
>


Re: Applying a Java script to many files: Java API or also Python API?

2017-09-29 Thread Weichen Xu
Although python can launch subprocess to run java code, but in PySpark, the
processing code which need to run parallelly in cluster, have to be written
in python, for example, in PySpark:

def f(x):
...
rdd.map(f)  // The function `f` must be pure python code

If you try to launch subprocess to run java code in function `f`, it will
bring large overhead and many other issues.

On Thu, Sep 28, 2017 at 5:36 PM, Giuseppe Celano <
cel...@informatik.uni-leipzig.de> wrote:

> Hi,
>
> What I meant is that I could run the Java script using the subprocess
> module in Python. In that case is any difference (from directly coding in
> the Java API)  in performance expected? Thanks.
>
>
>
> On Sep 28, 2017, at 3:32 AM, Weichen Xu  wrote:
>
> I think you have to use Spark Java API, in PySpark, functions running on
> spark executors (such as map function) can only written in python.
>
> On Thu, Sep 28, 2017 at 12:48 AM, Giuseppe Celano  leipzig.de> wrote:
>
>> Hi everyone,
>>
>> I would like to apply a java script to many files in parallel. I am
>> wondering whether I should definitely use the Spark Java API, or I could
>> also run the script using the Python API (with which I am more familiar
>> with), without this affecting performance. Thanks.
>>
>> Giuseppe
>>
>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


Structured Streaming and Hive

2017-09-29 Thread HanPan
Hi guys,

 

 I'm new to spark structured streaming. I'm using 2.1.0 and my scenario
is reading specific topic from kafka and do some data mining tasks, then
save the result dataset to hive.

 While writing data to hive, somehow it seems like not supported yet and
I tried this:



   It runs ok, but no result in hive.

 

   Any idea writing the stream result to hive?

 

Thanks

Pan

 

 



Re: Replicating a row n times

2017-09-29 Thread Weichen Xu
I suggest you to use `monotonicallyIncreasingId` which is high efficient.
But note that the ID it generated will not be consecutive.

On Fri, Sep 29, 2017 at 3:21 PM, Kanagha Kumar 
wrote:

> Thanks for the response.
> I can use either row_number() or monotonicallyIncreasingId to generate
> uniqueIds as in https://hadoopist.wordpress.com/2016/05/24/
> generate-unique-ids-for-each-rows-in-a-spark-dataframe/
>
> I'm looking for a java example to use that to replicate a single row n
> times by appending a rownum column generated as above or using explode
> function.
>
> Ex:
>
> ds.withColumn("ROWNUM", org.apache.spark.sql.functions.explode(columnEx));
>
> columnEx needs to be of type array inorder for explode to work.
>
> Any suggestions are helpful.
> Thanks
>
>
> On Thu, Sep 28, 2017 at 7:21 PM, ayan guha  wrote:
>
>> How about using row number for primary key?
>>
>> Select row_number() over (), * from table
>>
>> On Fri, 29 Sep 2017 at 10:21 am, Kanagha Kumar 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to replicate a single row from a dataset n times and create a
>>> new dataset from it. But, while replicating I need a column's value to be
>>> changed for each replication since it would be end up as the primary key
>>> when stored finally.
>>>
>>> Looked at the following reference:https://stackoverflo
>>> w.com/questions/40397740/replicate-spark-row-n-times
>>>
>>> import org.apache.spark.sql.functions._
>>> val result = singleRowDF
>>>   .withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
>>>   .selectExpr(singleRowDF.columns: _*)
>>>
>>> How can I create a column from an array of values in Java and pass it to
>>> explode function? Suggestions are helpful.
>>>
>>>
>>> Thanks
>>> Kanagha
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


[Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread Gaurav1809
Hi All,

I have multi node architecture of (1 master,2 workers) Spark cluster, the
job runs to read CSV file data and it works fine when run on local mode
(Local(*)). However, when the same job is ran in cluster mode
(Spark://HOST:PORT), it is not able to read it. I want to know how to
reference the files Or where to store them? Currently the CSV data file is
on master(from where the job is submitted).

Following code works fine in local mode but not in cluster mode.

val spark = SparkSession
  .builder()
  .appName("SampleFlightsApp")
  .master("spark://masterIP:7077") // change it to .master("local[*])
for local mode
  .getOrCreate()

val flightDF =
spark.read.option("header",true).csv("/home/username/sampleflightdata")
flightDF.printSchema()

Error: FileNotFoundException: File file:/home/gaurav/sampleflightdata does
not exist



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread Sathishkumar Manimoorthy
Place it in HDFS and give the reference path in your code.

Thanks,
Sathish

On Fri, Sep 29, 2017 at 3:31 PM, Gaurav1809  wrote:

> Hi All,
>
> I have multi node architecture of (1 master,2 workers) Spark cluster, the
> job runs to read CSV file data and it works fine when run on local mode
> (Local(*)). However, when the same job is ran in cluster mode
> (Spark://HOST:PORT), it is not able to read it. I want to know how to
> reference the files Or where to store them? Currently the CSV data file is
> on master(from where the job is submitted).
>
> Following code works fine in local mode but not in cluster mode.
>
> val spark = SparkSession
>   .builder()
>   .appName("SampleFlightsApp")
>   .master("spark://masterIP:7077") // change it to .master("local[*])
> for local mode
>   .getOrCreate()
>
> val flightDF =
> spark.read.option("header",true).csv("/home/username/sampleflightdata")
> flightDF.printSchema()
>
> Error: FileNotFoundException: File file:/home/gaurav/sampleflightdata does
> not exist
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread Gaurav1809
Hi All,

I have multi node architecture of (1 master,2 workers) Spark cluster, the
job runs to read CSV file data and it works fine when run on local mode
(Local(*)). 
However, when the same job is ran in cluster mode(Spark://HOST:PORT), it is
not able to read it. 
I want to know how to reference the files Or where to store them? Currently
the CSV data file is on master(from where the job is submitted).

Following code works fine in local mode but not in cluster mode.

val spark = SparkSession
  .builder()
  .appName("SampleFlightsApp")
  .master("spark://masterIP:7077") // change it to .master("local[*])
for local mode
  .getOrCreate()

val flightDF =
spark.read.option("header",true).csv("/home/username/sampleflightdata")
flightDF.printSchema()

Error: FileNotFoundException: File file:/home/username/sampleflightdata does
not exist



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread Jörn Franke
You should use a distributed filesystem such as HDFS. If you want to use the 
local filesystem then you have to copy each file to each node.

> On 29. Sep 2017, at 12:05, Gaurav1809  wrote:
> 
> Hi All,
> 
> I have multi node architecture of (1 master,2 workers) Spark cluster, the
> job runs to read CSV file data and it works fine when run on local mode
> (Local(*)). 
> However, when the same job is ran in cluster mode(Spark://HOST:PORT), it is
> not able to read it. 
> I want to know how to reference the files Or where to store them? Currently
> the CSV data file is on master(from where the job is submitted).
> 
> Following code works fine in local mode but not in cluster mode.
> 
> val spark = SparkSession
>  .builder()
>  .appName("SampleFlightsApp")
>  .master("spark://masterIP:7077") // change it to .master("local[*])
> for local mode
>  .getOrCreate()
> 
>val flightDF =
> spark.read.option("header",true).csv("/home/username/sampleflightdata")
>flightDF.printSchema()
> 
> Error: FileNotFoundException: File file:/home/username/sampleflightdata does
> not exist
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread Arun Rai
Or you can try mounting that drive to all node.

On Fri, Sep 29, 2017 at 6:14 AM Jörn Franke  wrote:

> You should use a distributed filesystem such as HDFS. If you want to use
> the local filesystem then you have to copy each file to each node.
>
> > On 29. Sep 2017, at 12:05, Gaurav1809  wrote:
> >
> > Hi All,
> >
> > I have multi node architecture of (1 master,2 workers) Spark cluster, the
> > job runs to read CSV file data and it works fine when run on local mode
> > (Local(*)).
> > However, when the same job is ran in cluster mode(Spark://HOST:PORT), it
> is
> > not able to read it.
> > I want to know how to reference the files Or where to store them?
> Currently
> > the CSV data file is on master(from where the job is submitted).
> >
> > Following code works fine in local mode but not in cluster mode.
> >
> > val spark = SparkSession
> >  .builder()
> >  .appName("SampleFlightsApp")
> >  .master("spark://masterIP:7077") // change it to .master("local[*])
> > for local mode
> >  .getOrCreate()
> >
> >val flightDF =
> > spark.read.option("header",true).csv("/home/username/sampleflightdata")
> >flightDF.printSchema()
> >
> > Error: FileNotFoundException: File file:/home/username/sampleflightdata
> does
> > not exist
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-29 Thread Gokula Krishnan D
Do you see any changes or improvments in the *Core-API* in Spark 2.X when
compared with Spark 1.6.0. ?.




Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Mon, Sep 25, 2017 at 1:32 PM, Gokula Krishnan D 
wrote:

> Thanks for the reply. Forgot to mention that, our Batch ETL Jobs are in
> Core-Spark.
>
>
> On Sep 22, 2017, at 3:13 PM, Vadim Semenov 
> wrote:
>
> 1. 40s is pretty negligible unless you run your job very frequently, there
> can be many factors that influence that.
>
> 2. Try to compare the CPU time instead of the wall-clock time
>
> 3. Check the stages that got slower and compare the DAGs
>
> 4. Test with dynamic allocation disabled
>
> On Fri, Sep 22, 2017 at 2:39 PM, Gokula Krishnan D 
> wrote:
>
>> Hello All,
>>
>> Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade
>> into Spark 2.1.0.
>>
>> With minor code changes (like configuration and Spark Session.sc) able to
>> execute the existing JOB into Spark 2.1.0.
>>
>> But noticed that JOB completion timings are much better in Spark 1.6.0
>> but no in Spark 2.1.0.
>>
>> For the instance, JOB A completed in 50s in Spark 1.6.0.
>>
>> And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0.
>>
>> Is there any specific factor needs to be considered when switching to
>> Spark 2.1.0 from Spark 1.6.0.
>>
>>
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>
>
>


HDFS or NFS as a cache?

2017-09-29 Thread Alexander Czech
I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
parquet files to S3. But the S3 performance for various reasons is bad when
I access s3 through the parquet write method:

df.write.parquet('s3a://bucket/parquet')

Now I want to setup a small cache for the parquet output. One output is
about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
master, write the output to it and then move it to S3? Or should I setup a
HDFS on the Master? Or should I even opt for an additional cluster running
a HDFS solution on more than one node?

thanks!


Re: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread Alexander Czech
Yes you need to store the file at a location where it is equally
retrievable ("same path") for the master and all nodes in the cluster. A
simple solution (apart from a HDFS) that does not scale to well but might
be a OK with only 3 nodes like in your configuration is a network
accessible storage (a NAS or a shared folder for example).

hope this helps
Alexander

On Fri, Sep 29, 2017 at 12:05 PM, Sathishkumar Manimoorthy <
mrsathishkuma...@gmail.com> wrote:

> Place it in HDFS and give the reference path in your code.
>
> Thanks,
> Sathish
>
> On Fri, Sep 29, 2017 at 3:31 PM, Gaurav1809 
> wrote:
>
>> Hi All,
>>
>> I have multi node architecture of (1 master,2 workers) Spark cluster, the
>> job runs to read CSV file data and it works fine when run on local mode
>> (Local(*)). However, when the same job is ran in cluster mode
>> (Spark://HOST:PORT), it is not able to read it. I want to know how to
>> reference the files Or where to store them? Currently the CSV data file is
>> on master(from where the job is submitted).
>>
>> Following code works fine in local mode but not in cluster mode.
>>
>> val spark = SparkSession
>>   .builder()
>>   .appName("SampleFlightsApp")
>>   .master("spark://masterIP:7077") // change it to .master("local[*])
>> for local mode
>>   .getOrCreate()
>>
>> val flightDF =
>> spark.read.option("header",true).csv("/home/username/sampleflightdata")
>> flightDF.printSchema()
>>
>> Error: FileNotFoundException: File file:/home/gaurav/sampleflightdata
>> does
>> not exist
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-29 Thread Yana Kadiyska
One thing to note, if you are using Mesos, is that the version of Mesos
changed from 0.21 to 1.0.0. So taking a newer Spark might push you into
larger infrastructure upgrades

On Fri, Sep 22, 2017 at 2:39 PM, Gokula Krishnan D 
wrote:

> Hello All,
>
> Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade
> into Spark 2.1.0.
>
> With minor code changes (like configuration and Spark Session.sc) able to
> execute the existing JOB into Spark 2.1.0.
>
> But noticed that JOB completion timings are much better in Spark 1.6.0 but
> no in Spark 2.1.0.
>
> For the instance, JOB A completed in 50s in Spark 1.6.0.
>
> And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0.
>
> Is there any specific factor needs to be considered when switching to
> Spark 2.1.0 from Spark 1.6.0.
>
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>


Saving dataframes with partitionBy: append partitions, overwrite within each

2017-09-29 Thread peay
Hello,

I am trying to use data_frame.write.partitionBy("day").save("dataset.parquet") 
to write a dataset while splitting by day.

I would like to run a Spark job  to process, e.g., a month:
dataset.parquet/day=2017-01-01/...
...

and then run another Spark job to add another month using the same folder 
structure, getting me
dataset.parquet/day=2017-01-01/
...
[dataset.parquet/day=2017-02-01/](http://dataset.parquet/day=2017-01-01/)
...

However:
- with save mode "overwrite", when I process the second month, all of 
dataset.parquet/ gets removed and I lose whatever was already computed for the 
previous month.
- with save mode "append", then I can't get idempotence: if I run the job to 
process a given month twice, I'll get duplicate data in all the subfolders for 
that month.

Is there a way to do "append in terms of the subfolders from partitionBy, but 
overwrite within each such partitions? Any help would be appreciated.

Thanks!

Re: HDFS or NFS as a cache?

2017-09-29 Thread Vadim Semenov
How many files you produce? I believe it spends a lot of time on renaming
the files because of the output committer.
Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they
have 10GbE and you can get good throughput for S3.

On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
> parquet files to S3. But the S3 performance for various reasons is bad when
> I access s3 through the parquet write method:
>
> df.write.parquet('s3a://bucket/parquet')
>
> Now I want to setup a small cache for the parquet output. One output is
> about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
> master, write the output to it and then move it to S3? Or should I setup a
> HDFS on the Master? Or should I even opt for an additional cluster running
> a HDFS solution on more than one node?
>
> thanks!
>


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2017-09-29 Thread Vadim Semenov
As alternative: checkpoint the dataframe, collect days, and then delete
corresponding directories using hadoop FileUtils, then write the dataframe

On Fri, Sep 29, 2017 at 10:31 AM, peay  wrote:

> Hello,
>
> I am trying to use data_frame.write.partitionBy("day").save("dataset.parquet")
> to write a dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/ 
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job
> to process a given month twice, I'll get duplicate data in all the
> subfolders for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!
>


Re: More instances = slower Spark job

2017-09-29 Thread Alexander Czech
Does each gzip file look like this:

{json1}
{json2}
{json3}

meaning that each line is a separate json object?

I proccess a similar large file batch and what I do is this:

input.txt # each line in input.txt represents a path to a gzip file each
containing a json object every line
my_rdd = sc.parallelize(input.txt) # creats a rdd with each file_path as a
row
my_rdd = my_rdd.flatmap(open_files) # opens the files and yields them line
by line
my_rdd = my_rdd.map(do_something_with_files) # now do something with each
line

the important part at least in python is the yield, because it makes the
function memory efficient. You could even go further and only yield a json
if it matches the regex criteria saving you the map(). Maybe yield a
(path,json) pair to later reconstruct which line goes into which file.
Reduce the rdd and write out the file.

If all files in input.txt are to big to be processed at once consider
dividing input.txt into smaller chunks and process each chunk individually.

On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta  wrote:

> I think that Vadim's response makes a lot of sense in terms of utilizing
> SPARK. Why are you not using JSON reader of SPARK? Your input has to follow
> a particular JSON style, but then it would be interesting to know whether
> you have looked into it at all.
>
> If you are going to read them only once then there is really no need to
> convert them and then read them.
>
> I will be really interested to hear in case you were able to using json
> reader natively available in SPARK.
>
>
> Regards,
> Gourav
>
> On Thu, Sep 28, 2017 at 8:16 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> Instead of having one job, you can try processing each file in a separate
>> job, but run multiple jobs in parallel within one SparkContext.
>> Something like this should work for you, it'll submit N jobs from the
>> driver, the jobs will run independently, but executors will dynamically
>> work on different jobs, so you'll utilize executors at full.
>>
>> ```
>> import org.apache.spark.sql.SparkSession
>>
>> import scala.collection.parallel.ForkJoinTaskSupport
>>
>> val spark: SparkSession
>> val files: Seq[String]
>> val filesParallelCollection = files.toParArray
>> val howManyFilesToProcessInParallel = math.min(50, files.length)
>>
>> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>>   new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcess
>> InParallel)
>> )
>> filesParallelCollection.foreach(file => {
>>   spark.read.text(file).filter(…)…
>> })
>> ```
>>
>> On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller 
>> wrote:
>>
>>> More details on what I want to achieve. Maybe someone can suggest a
>>> course of action.
>>>
>>> My processing is extremely simple: reading .json.gz text
>>> files, filtering each line according a regex, and saving the surviving
>>> lines in a similarly named .gz file.
>>>
>>> Unfortunately changing the data format is impossible (we are dealing
>>> with terabytes here) so I need to process .gz files.
>>>
>>> Ideally, I would like to process a large number of such files in
>>> parallel, that is using n_e executors which would each take care of a
>>> fraction 1/n_e of all the files. The trouble is that I don't know how
>>> to process files in parallel without loading them in the driver first,
>>> which would result in a major bottleneck.
>>>
>>> Here was my naive approach in Scala-like pseudo-code:
>>>
>>> //
>>> // This happens on the driver
>>> //
>>> val files = List("s3://bckt/file-1.json.gz", ...,
>>> "s3://bckt/file-N.json.gz")
>>> val files_rdd = sc.parallelize(files, num_partitions)
>>> //
>>> // Now files_rdd (which only holds file names) is distributed across
>>> several executors
>>> // and/or nodes
>>> //
>>>
>>> files_rdd.foreach(
>>> //
>>> // It is my understanding that what is performed within the foreach
>>> method
>>> // will be parallelized on several executors / nodes
>>> //
>>> file => {
>>> //
>>> // This would happen on a given executor: a given input file
>>> is processed
>>> // entirely on a given executor
>>> //
>>> val lines = sc.read.text(file)
>>> val filtered_lines = lines.filter( // filter based on regex // )
>>> filtered_lines.write.option("compression",
>>> "gzip").text("a_filename_tbd")
>>> }
>>> )
>>>
>>> Unfortunately this is not possible since the Spark context sc is
>>> defined in the driver and cannot be shared.
>>>
>>> My problem would be entirely solved if I could manage to read files
>>> not from the driver, but from a given executor.
>>>
>>> Another possibility would be to read each .gz file in the driver
>>> (about 2GB each), materializing the whole resulting RDD on the driver
>>> (around 12GB) and then calling repartition on that RDD, but only the
>>> regex part would be parallelized, and the data shuffling will probably
>>> ruin the performance.
>>>
>>> Any idea?
>>>
>>> 

Re: HDFS or NFS as a cache?

2017-09-29 Thread Alexander Czech
Yes I have identified the rename as the problem, that is why I think the
extra bandwidth of the larger instances might not help. Also there is a
consistency issue with S3 because of the how the rename works so that I
probably lose data.

On Fri, Sep 29, 2017 at 4:42 PM, Vadim Semenov 
wrote:

> How many files you produce? I believe it spends a lot of time on renaming
> the files because of the output committer.
> Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they
> have 10GbE and you can get good throughput for S3.
>
> On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech <
> alexander.cz...@googlemail.com> wrote:
>
>> I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
>> parquet files to S3. But the S3 performance for various reasons is bad when
>> I access s3 through the parquet write method:
>>
>> df.write.parquet('s3a://bucket/parquet')
>>
>> Now I want to setup a small cache for the parquet output. One output is
>> about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
>> master, write the output to it and then move it to S3? Or should I setup a
>> HDFS on the Master? Or should I even opt for an additional cluster running
>> a HDFS solution on more than one node?
>>
>> thanks!
>>
>
>


Re: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread lucas.g...@gmail.com
We use S3, there are caveats and issues with that but it can be made to
work.

If interested let me know and I'll show you our workarounds.  I wouldn't do
it naively though, there's lots of potential problems.  If you already have
HDFS use that, otherwise all things told it's probably less effort to use
S3.

Gary

On 29 September 2017 at 05:03, Arun Rai  wrote:

> Or you can try mounting that drive to all node.
>
> On Fri, Sep 29, 2017 at 6:14 AM Jörn Franke  wrote:
>
>> You should use a distributed filesystem such as HDFS. If you want to use
>> the local filesystem then you have to copy each file to each node.
>>
>> > On 29. Sep 2017, at 12:05, Gaurav1809  wrote:
>> >
>> > Hi All,
>> >
>> > I have multi node architecture of (1 master,2 workers) Spark cluster,
>> the
>> > job runs to read CSV file data and it works fine when run on local mode
>> > (Local(*)).
>> > However, when the same job is ran in cluster mode(Spark://HOST:PORT),
>> it is
>> > not able to read it.
>> > I want to know how to reference the files Or where to store them?
>> Currently
>> > the CSV data file is on master(from where the job is submitted).
>> >
>> > Following code works fine in local mode but not in cluster mode.
>> >
>> > val spark = SparkSession
>> >  .builder()
>> >  .appName("SampleFlightsApp")
>> >  .master("spark://masterIP:7077") // change it to
>> .master("local[*])
>> > for local mode
>> >  .getOrCreate()
>> >
>> >val flightDF =
>> > spark.read.option("header",true).csv("/home/username/sampleflightdata")
>> >flightDF.printSchema()
>> >
>> > Error: FileNotFoundException: File file:/home/username/sampleflightdata
>> does
>> > not exist
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: More instances = slower Spark job

2017-09-29 Thread Gourav Sengupta
I think that the best option is to see whether data frames option of
reading JSON files works or not.



On Fri, Sep 29, 2017 at 3:53 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> Does each gzip file look like this:
>
> {json1}
> {json2}
> {json3}
>
> meaning that each line is a separate json object?
>
> I proccess a similar large file batch and what I do is this:
>
> input.txt # each line in input.txt represents a path to a gzip file each
> containing a json object every line
> my_rdd = sc.parallelize(input.txt) # creats a rdd with each file_path as
> a row
> my_rdd = my_rdd.flatmap(open_files) # opens the files and yields them line
> by line
> my_rdd = my_rdd.map(do_something_with_files) # now do something with each
> line
>
> the important part at least in python is the yield, because it makes the
> function memory efficient. You could even go further and only yield a json
> if it matches the regex criteria saving you the map(). Maybe yield a
> (path,json) pair to later reconstruct which line goes into which file.
> Reduce the rdd and write out the file.
>
> If all files in input.txt are to big to be processed at once consider
> dividing input.txt into smaller chunks and process each chunk individually.
>
> On Fri, Sep 29, 2017 at 12:20 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> I think that Vadim's response makes a lot of sense in terms of utilizing
>> SPARK. Why are you not using JSON reader of SPARK? Your input has to follow
>> a particular JSON style, but then it would be interesting to know whether
>> you have looked into it at all.
>>
>> If you are going to read them only once then there is really no need to
>> convert them and then read them.
>>
>> I will be really interested to hear in case you were able to using json
>> reader natively available in SPARK.
>>
>>
>> Regards,
>> Gourav
>>
>> On Thu, Sep 28, 2017 at 8:16 PM, Vadim Semenov <
>> vadim.seme...@datadoghq.com> wrote:
>>
>>> Instead of having one job, you can try processing each file in a
>>> separate job, but run multiple jobs in parallel within one SparkContext.
>>> Something like this should work for you, it'll submit N jobs from the
>>> driver, the jobs will run independently, but executors will dynamically
>>> work on different jobs, so you'll utilize executors at full.
>>>
>>> ```
>>> import org.apache.spark.sql.SparkSession
>>>
>>> import scala.collection.parallel.ForkJoinTaskSupport
>>>
>>> val spark: SparkSession
>>> val files: Seq[String]
>>> val filesParallelCollection = files.toParArray
>>> val howManyFilesToProcessInParallel = math.min(50, files.length)
>>>
>>> filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
>>>   new scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcess
>>> InParallel)
>>> )
>>> filesParallelCollection.foreach(file => {
>>>   spark.read.text(file).filter(…)…
>>> })
>>> ```
>>>
>>> On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller 
>>> wrote:
>>>
 More details on what I want to achieve. Maybe someone can suggest a
 course of action.

 My processing is extremely simple: reading .json.gz text
 files, filtering each line according a regex, and saving the surviving
 lines in a similarly named .gz file.

 Unfortunately changing the data format is impossible (we are dealing
 with terabytes here) so I need to process .gz files.

 Ideally, I would like to process a large number of such files in
 parallel, that is using n_e executors which would each take care of a
 fraction 1/n_e of all the files. The trouble is that I don't know how
 to process files in parallel without loading them in the driver first,
 which would result in a major bottleneck.

 Here was my naive approach in Scala-like pseudo-code:

 //
 // This happens on the driver
 //
 val files = List("s3://bckt/file-1.json.gz", ...,
 "s3://bckt/file-N.json.gz")
 val files_rdd = sc.parallelize(files, num_partitions)
 //
 // Now files_rdd (which only holds file names) is distributed across
 several executors
 // and/or nodes
 //

 files_rdd.foreach(
 //
 // It is my understanding that what is performed within the foreach
 method
 // will be parallelized on several executors / nodes
 //
 file => {
 //
 // This would happen on a given executor: a given input file
 is processed
 // entirely on a given executor
 //
 val lines = sc.read.text(file)
 val filtered_lines = lines.filter( // filter based on regex // )
 filtered_lines.write.option("compression",
 "gzip").text("a_filename_tbd")
 }
 )

 Unfortunately this is not possible since the Spark context sc is
 defined in the driver and cannot be shared.

 My problem would be entirely solved if I could manage to read files
 not from the driver, but from a given executo

Needed some best practices to integrate Spark with HBase

2017-09-29 Thread Debabrata Ghosh
Dear All,
 Greetings !

 I needed some best practices for integrating Spark
with HBase. Would you be able to point me to some useful resources / URL's
to your convenience please.

Thanks,

Debu


Re: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread Imran Rajjad
Try tachyon.. its less fuss


On Fri, 29 Sep 2017 at 8:32 PM lucas.g...@gmail.com 
wrote:

> We use S3, there are caveats and issues with that but it can be made to
> work.
>
> If interested let me know and I'll show you our workarounds.  I wouldn't
> do it naively though, there's lots of potential problems.  If you already
> have HDFS use that, otherwise all things told it's probably less effort to
> use S3.
>
> Gary
>
> On 29 September 2017 at 05:03, Arun Rai  wrote:
>
>> Or you can try mounting that drive to all node.
>>
>> On Fri, Sep 29, 2017 at 6:14 AM Jörn Franke  wrote:
>>
>>> You should use a distributed filesystem such as HDFS. If you want to use
>>> the local filesystem then you have to copy each file to each node.
>>>
>>>
>>>
>>>
>>>
>>> > On 29. Sep 2017, at 12:05, Gaurav1809  wrote:
>>>
>>>
>>> >
>>>
>>>
>>> > Hi All,
>>>
>>>
>>> >
>>>
>>>
>>> > I have multi node architecture of (1 master,2 workers) Spark cluster,
>>> the
>>>
>>>
>>> > job runs to read CSV file data and it works fine when run on local mode
>>>
>>>
>>> > (Local(*)).
>>>
>>>
>>> > However, when the same job is ran in cluster mode(Spark://HOST:PORT),
>>> it is
>>>
>>>
>>> > not able to read it.
>>>
>>>
>>> > I want to know how to reference the files Or where to store them?
>>> Currently
>>>
>>>
>>> > the CSV data file is on master(from where the job is submitted).
>>>
>>>
>>> >
>>>
>>>
>>> > Following code works fine in local mode but not in cluster mode.
>>>
>>>
>>> >
>>>
>>>
>>> > val spark = SparkSession
>>>
>>>
>>> >  .builder()
>>>
>>>
>>> >  .appName("SampleFlightsApp")
>>>
>>>
>>> >  .master("spark://masterIP:7077") // change it to
>>> .master("local[*])
>>>
>>>
>>> > for local mode
>>>
>>>
>>> >  .getOrCreate()
>>>
>>>
>>> >
>>>
>>>
>>> >val flightDF =
>>>
>>>
>>> > spark.read.option("header",true).csv("/home/username/sampleflightdata")
>>>
>>>
>>> >flightDF.printSchema()
>>>
>>>
>>> >
>>>
>>>
>>> > Error: FileNotFoundException: File
>>> file:/home/username/sampleflightdata does
>>>
>>>
>>> > not exist
>>>
>>>
>>> >
>>>
>>>
>>> >
>>>
>>>
>>> >
>>>
>>>
>>> > --
>>>
>>>
>>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>>
>>> >
>>>
>>>
>>> > -
>>>
>>>
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>> >
>>>
>>>
>>>
>>>
>>>
>>> -
>>>
>>>
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>
> --
Sent from Gmail Mobile


Re: More instances = slower Spark job

2017-09-29 Thread Vadim Semenov
Hi Jeroen,

> However, am I correct in assuming that all the filtering will be then
performed on the driver (since the .gz files are not splittable), albeit in
several threads?

Filtering will not happen on the driver, it'll happen on executors, since
`spark.read.json(…).filter(…).write(…)` is a separate job. But you have to
submit each job in a separate thread, because each thread will get locked
until the corresponding job finishes, so that's why you have to use
`parallel collections`, you could also just use Futures, but it's just
easier to use a `ParArray`.

Internally it will work this way: once one task finishes decompressing a
file, many tasks will get scheduled (based on `spark.default.parallelism`),
and the executor that decompressed the file will start processing lines
using all available threads, and after some time additional executors may
join (based on the locality levels), and then after filtering, you would
have to repartition back to 1 partition, so you could write just one
`.gzip` file.

And for each file, there will be a separate job, but because they all run
within one Spark Context, executors will stay with the job, and will work
on all files simultaneously.
See more about scheduling within one application:
https://spark.apache.org/docs/2.2.0/job-scheduling.html#
scheduling-within-an-application

On Fri, Sep 29, 2017 at 12:58 PM, Jeroen Miller 
wrote:

> On Thu, Sep 28, 2017 at 11:55 PM, Jeroen Miller 
> wrote:
> > On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov
> >  wrote:
> >> Instead of having one job, you can try processing each file in a
> separate
> >> job, but run multiple jobs in parallel within one SparkContext.
>
> Hello Vadim,
>
> Today was a bit busy and I did not have the time to play with your
> idea. However, am I correct in assuming that all the filtering will be
> then performed on the driver (since the .gz files are not splittable),
> albeit in several threads?
>
> If this is correct, then I guess the proper way to tackle this task
> would be to run without any executors, but using all the cores and
> memory of the machine for the driver?
>
> I will keep you posted on my progress,
>
> Thanks,
>
> Jeroen
>


RE: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread JG Perrin
On a test system, you can also use something like Owncloud/Nextcloud/Dropbox to 
insure that the files are synchronized. Would not do it for TB of data ;) ...

-Original Message-
From: Jörn Franke [mailto:jornfra...@gmail.com] 
Sent: Friday, September 29, 2017 5:14 AM
To: Gaurav1809 
Cc: user@spark.apache.org
Subject: Re: [Spark-Submit] Where to store data files while running job in 
cluster mode?

You should use a distributed filesystem such as HDFS. If you want to use the 
local filesystem then you have to copy each file to each node.

> On 29. Sep 2017, at 12:05, Gaurav1809  wrote:
> 
> Hi All,
> 
> I have multi node architecture of (1 master,2 workers) Spark cluster, 
> the job runs to read CSV file data and it works fine when run on local 
> mode (Local(*)).
> However, when the same job is ran in cluster mode(Spark://HOST:PORT), 
> it is not able to read it.
> I want to know how to reference the files Or where to store them? 
> Currently the CSV data file is on master(from where the job is submitted).
> 
> Following code works fine in local mode but not in cluster mode.
> 
> val spark = SparkSession
>  .builder()
>  .appName("SampleFlightsApp")
>  .master("spark://masterIP:7077") // change it to 
> .master("local[*]) for local mode
>  .getOrCreate()
> 
>val flightDF =
> spark.read.option("header",true).csv("/home/username/sampleflightdata")
>flightDF.printSchema()
> 
> Error: FileNotFoundException: File 
> file:/home/username/sampleflightdata does not exist
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: HDFS or NFS as a cache?

2017-09-29 Thread JG Perrin
You will collect in the driver (often the master) and it will save the data, so 
for saving, you will not have to set up HDFS.

From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
Sent: Friday, September 29, 2017 8:15 AM
To: user@spark.apache.org
Subject: HDFS or NFS as a cache?

I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')
Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?
thanks!


Crash in Unit Tests

2017-09-29 Thread Anthony Thomas
Hi Spark Users,

I recently compiled spark 2.2.0 from source on an EC2 m4.2xlarge instance
(8 cores, 32G RAM) running Ubuntu 14.04. I'm using Oracle Java 1.8. I
compiled using the command:

export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests -Pnetlib-lgpl clean package

Spark compiles fine, but when running tests (./build/mvn test), at least
one test in "StandaloneDynamicAllocationSuite" of "Spark Project Core"
consistently cause the JVM to crash with errors like:

 Exception in thread "ExecutorRunner for app-20170929185545-/0"
java.lang.OutOfMemoryError: unable to create new native thread

According to "cat /proc/sys/kernel/threads-max" the system can support up
to 120,000 threads which seems like it should be more than enough. The
limits set in ulimit also seem reasonable. Looking at "top," the JVM
doesn't seem to be using anywhere close to 32GB of RAM.

Has anyone else encountered similar issues or have any suggestions about
where to go in diagnosing the cause of this problem? Alternatively, is this
a problem I can safely ignore? Running some short code segments on Spark
seems to work just fine, but I'm wondering if this will become a problem at
heavy loads. Please let me know if there's any other info that would be
helpful.


Re: Crash in Unit Tests

2017-09-29 Thread Eduardo Mello
I had this problem at my work.

I solved by increasing the unix ulimit,  because spark is trying to open to
many files.

Em 29 de set de 2017 5:05 PM, "Anthony Thomas" 
escreveu:

> Hi Spark Users,
>
> I recently compiled spark 2.2.0 from source on an EC2 m4.2xlarge instance
> (8 cores, 32G RAM) running Ubuntu 14.04. I'm using Oracle Java 1.8. I
> compiled using the command:
>
> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
> ./build/mvn -DskipTests -Pnetlib-lgpl clean package
>
> Spark compiles fine, but when running tests (./build/mvn test), at least
> one test in "StandaloneDynamicAllocationSuite" of "Spark Project Core"
> consistently cause the JVM to crash with errors like:
>
>  Exception in thread "ExecutorRunner for app-20170929185545-/0"
> java.lang.OutOfMemoryError: unable to create new native thread
>
> According to "cat /proc/sys/kernel/threads-max" the system can support up
> to 120,000 threads which seems like it should be more than enough. The
> limits set in ulimit also seem reasonable. Looking at "top," the JVM
> doesn't seem to be using anywhere close to 32GB of RAM.
>
> Has anyone else encountered similar issues or have any suggestions about
> where to go in diagnosing the cause of this problem? Alternatively, is
> this a problem I can safely ignore? Running some short code segments on
> Spark seems to work just fine, but I'm wondering if this will become a
> problem at heavy loads. Please let me know if there's any other info that
> would be helpful.
>


Re: [Spark-Submit] Where to store data files while running job in cluster mode?

2017-09-29 Thread vaquar khan
If you're running in a clustered mode you need to copy the file across all
the nodes of same shared file system.

1) put it into a distributed filesystem as HDFS or via (s)ftp

2) you  have to transfer /sftp the file into the worker node before running
the Spark job and then you have to put as an argument of textFile the path
of the file in the worker filesystem.

Regards,
Vaquar khan

On Fri, Sep 29, 2017 at 2:00 PM, JG Perrin  wrote:

> On a test system, you can also use something like
> Owncloud/Nextcloud/Dropbox to insure that the files are synchronized. Would
> not do it for TB of data ;) ...
>
> -Original Message-
> From: Jörn Franke [mailto:jornfra...@gmail.com]
> Sent: Friday, September 29, 2017 5:14 AM
> To: Gaurav1809 
> Cc: user@spark.apache.org
> Subject: Re: [Spark-Submit] Where to store data files while running job in
> cluster mode?
>
> You should use a distributed filesystem such as HDFS. If you want to use
> the local filesystem then you have to copy each file to each node.
>
> > On 29. Sep 2017, at 12:05, Gaurav1809  wrote:
> >
> > Hi All,
> >
> > I have multi node architecture of (1 master,2 workers) Spark cluster,
> > the job runs to read CSV file data and it works fine when run on local
> > mode (Local(*)).
> > However, when the same job is ran in cluster mode(Spark://HOST:PORT),
> > it is not able to read it.
> > I want to know how to reference the files Or where to store them?
> > Currently the CSV data file is on master(from where the job is
> submitted).
> >
> > Following code works fine in local mode but not in cluster mode.
> >
> > val spark = SparkSession
> >  .builder()
> >  .appName("SampleFlightsApp")
> >  .master("spark://masterIP:7077") // change it to
> > .master("local[*]) for local mode
> >  .getOrCreate()
> >
> >val flightDF =
> > spark.read.option("header",true).csv("/home/username/sampleflightdata")
> >flightDF.printSchema()
> >
> > Error: FileNotFoundException: File
> > file:/home/username/sampleflightdata does not exist
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago


[Structured Streaming] How to compute the difference between two rows of a streaming dataframe?

2017-09-29 Thread 张万新
Hi,

I want to compute the difference between two rows in a streaming dataframe,
is there a feasible API? May be some function like the window function *lag
*in normal dataframe, but it seems that this function is unavailable in
streaming dataframe.

Thanks.