[Spark] Expose Spark to Untrusted Users?

2017-10-02 Thread Jack Leadford
Hello,

I would like to expose Apache Spark to untrusted users (through Livy, and with 
a direct
JDBC connection).

However, there appear to be a variety of avenues wherein one of these untrusted 
users
can execute arbitrary code (by design): PySpark, SparkR, Jar uploads, various 
UDFs, etc.

I would like to prevent my untrusted users from executing arbitrary remote code.
I have found small bits of information relating to this[0][1], but nothing 
comprehensive
or prescriptive.

I understand that this is not exactly Spark’s use case, but any thoughts or 
opinions with
regards to this would be appreciated, and especially if there is an established 
process
for tending to this scenario.

Thanks.

Jack

0: 
https://stackoverflow.com/questions/38333873/securely-running-a-spark-application-inside-a-sandbox
1: 
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.server2.builtin.udf.whitelist




Re: Example of GBTClassifier

2017-10-02 Thread Weichen Xu
It should be eclipses issues. The method is there, in super class
`Predictor`.

On Mon, Oct 2, 2017 at 11:51 PM, mckunkel  wrote:

> Greetings,
> I am trying to run the example in the example directory for the
> GBTClassifier. But when I view this code in eclipse, I get an error such
> that
> "The method setLabelCol(String) is undefined for the type GBTClassifier"
> For the line
>
> GBTClassifier gbt = new
> GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol(
> "indexedFeatures")
> .setMaxIter(10);
>
> However the API says this method is there, eclipse does not.
> I did a straight copy paste, including all imports.
>
> Someone please help.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Quick one... AWS SDK version?

2017-10-02 Thread JG Perrin
Hey Sparkians,

What version of AWS Java SDK do you use with Spark 2.2? Do you stick with the 
Hadoop 2.7.3 libs?

Thanks!

jg


Re: PySpark - Expand rows into dataframes via function

2017-10-02 Thread Sathish Kumaran Vairavelu
It's possible with array function combined with struct construct. Below is
a SQL example

select Array(struct(ip1,hashkey), struct(ip2,hashkey))
from (select substr(col1,1,2) as ip1, substr(col1,3,3) as ip2, etc, hashkey
from object) a

If you want dynamic ip ranges; you need to dynamically construct structs
based on the range values. Hope this helps.


Thanks

Sathish

On Mon, Oct 2, 2017 at 9:01 AM Patrick McCarthy 
wrote:

> Hello,
>
> I'm trying to map ARIN registry files into more explicit IP ranges. They
> provide a number of IPs in the range (here it's 8192) and a starting IP,
> and I'm trying to map it into all the included /24 subnets. For example,
>
> Input:
>
> array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',
>
>'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)
>
>
> Output:
>
> array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
>['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
>['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],
>
> ...
>
>
> I have the input lookup table in a pyspark DF, and a python function to do 
> the conversion into the mapped output. I think to produce the full mapping I 
> need a UDTF but this concept doesn't seem to exist in PySpark. What's the 
> best approach to do this mapping and recombine into a new DataFrame?
>
>
> Thanks,
>
> Patrick
>
>


Re: HDFS or NFS as a cache?

2017-10-02 Thread Miguel Morales
See: https://github.com/rdblue/s3committer and
https://www.youtube.com/watch?v=8F2Jqw5_OnI=youtu.be


On Mon, Oct 2, 2017 at 11:31 AM, Marcelo Vanzin  wrote:

> You don't need to collect data in the driver to save it. The code in
> the original question doesn't use "collect()", so it's actually doing
> a distributed write.
>
>
> On Mon, Oct 2, 2017 at 11:26 AM, JG Perrin  wrote:
> > Steve,
> >
> >
> >
> > If I refer to the collect() API, it says “Running collect requires moving
> > all the data into the application's driver process, and doing so on a
> very
> > large dataset can crash the driver process with OutOfMemoryError.” So why
> > would you need a distributed FS?
> >
> >
> >
> > jg
> >
> >
> >
> > From: Steve Loughran [mailto:ste...@hortonworks.com]
> > Sent: Saturday, September 30, 2017 6:10 AM
> > To: JG Perrin 
> > Cc: Alexander Czech ;
> user@spark.apache.org
> > Subject: Re: HDFS or NFS as a cache?
> >
> >
> >
> >
> >
> > On 29 Sep 2017, at 20:03, JG Perrin  wrote:
> >
> >
> >
> > You will collect in the driver (often the master) and it will save the
> data,
> > so for saving, you will not have to set up HDFS.
> >
> >
> >
> > no, it doesn't work quite like that.
> >
> >
> >
> > 1. workers generate their data and save somwhere
> >
> > 2. on "task commit" they move their data to some location where it will
> be
> > visible for "job commit" (rename, upload, whatever)
> >
> > 3. job commit —which is done in the driver,— takes all the committed task
> > data and makes it visible in the destination directory.
> >
> > 4. Then they create a _SUCCESS file to say "done!"
> >
> >
> >
> >
> >
> > This is done with Spark talking between workers and drivers to guarantee
> > that only one task working on a specific part of the data commits their
> > work, only
> >
> > committing the job once all tasks have finished
> >
> >
> >
> > The v1 mapreduce committer implements (2) by moving files under a job
> > attempt dir, and (3) by moving it from the job attempt dir to the
> > destination. one rename per task commit, another rename of every file on
> job
> > commit. In HFDS, Azure wasb and other stores with an O(1) atomic rename,
> > this isn't *too* expensve, though that final job commit rename still
> takes
> > time to list and move lots of files
> >
> >
> >
> > The v2 committer implements (2) by renaming to the destination directory
> and
> > (3) as a no-op. Rename in the tasks then, but not not that second,
> > serialized one at the end
> >
> >
> >
> > There's no copy of data from workers to driver, instead you need a shared
> > output filesystem so that the job committer can do its work alongside the
> > tasks.
> >
> >
> >
> > There are alternatives committer agorithms,
> >
> >
> >
> > 1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
> >
> > 2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code
> > (https://github.com/SparkTC/stocator/)
> >
> > 3. Ongoing work in Hadoop itself for better committers. Goal: year end &
> > Hadoop 3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The
> oode is
> > all there, Parquet is a troublespot, and more testing is welcome from
> anyone
> > who wants to help.
> >
> > 4. Databricks have "something"; specifics aren't covered, but I assume
> its
> > dynamo DB based
> >
> >
> >
> >
> >
> > -Steve
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
> > Sent: Friday, September 29, 2017 8:15 AM
> > To: user@spark.apache.org
> > Subject: HDFS or NFS as a cache?
> >
> >
> >
> > I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
> > parquet files to S3. But the S3 performance for various reasons is bad
> when
> > I access s3 through the parquet write method:
> >
> > df.write.parquet('s3a://bucket/parquet')
> >
> > Now I want to setup a small cache for the parquet output. One output is
> > about 12-15 GB in size. Would it be enough to setup a NFS-directory on
> the
> > master, write the output to it and then move it to S3? Or should I setup
> a
> > HDFS on the Master? Or should I even opt for an additional cluster
> running a
> > HDFS solution on more than one node?
> >
> > thanks!
> >
> >
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: HDFS or NFS as a cache?

2017-10-02 Thread Marcelo Vanzin
You don't need to collect data in the driver to save it. The code in
the original question doesn't use "collect()", so it's actually doing
a distributed write.


On Mon, Oct 2, 2017 at 11:26 AM, JG Perrin  wrote:
> Steve,
>
>
>
> If I refer to the collect() API, it says “Running collect requires moving
> all the data into the application's driver process, and doing so on a very
> large dataset can crash the driver process with OutOfMemoryError.” So why
> would you need a distributed FS?
>
>
>
> jg
>
>
>
> From: Steve Loughran [mailto:ste...@hortonworks.com]
> Sent: Saturday, September 30, 2017 6:10 AM
> To: JG Perrin 
> Cc: Alexander Czech ; user@spark.apache.org
> Subject: Re: HDFS or NFS as a cache?
>
>
>
>
>
> On 29 Sep 2017, at 20:03, JG Perrin  wrote:
>
>
>
> You will collect in the driver (often the master) and it will save the data,
> so for saving, you will not have to set up HDFS.
>
>
>
> no, it doesn't work quite like that.
>
>
>
> 1. workers generate their data and save somwhere
>
> 2. on "task commit" they move their data to some location where it will be
> visible for "job commit" (rename, upload, whatever)
>
> 3. job commit —which is done in the driver,— takes all the committed task
> data and makes it visible in the destination directory.
>
> 4. Then they create a _SUCCESS file to say "done!"
>
>
>
>
>
> This is done with Spark talking between workers and drivers to guarantee
> that only one task working on a specific part of the data commits their
> work, only
>
> committing the job once all tasks have finished
>
>
>
> The v1 mapreduce committer implements (2) by moving files under a job
> attempt dir, and (3) by moving it from the job attempt dir to the
> destination. one rename per task commit, another rename of every file on job
> commit. In HFDS, Azure wasb and other stores with an O(1) atomic rename,
> this isn't *too* expensve, though that final job commit rename still takes
> time to list and move lots of files
>
>
>
> The v2 committer implements (2) by renaming to the destination directory and
> (3) as a no-op. Rename in the tasks then, but not not that second,
> serialized one at the end
>
>
>
> There's no copy of data from workers to driver, instead you need a shared
> output filesystem so that the job committer can do its work alongside the
> tasks.
>
>
>
> There are alternatives committer agorithms,
>
>
>
> 1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
>
> 2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code
> (https://github.com/SparkTC/stocator/)
>
> 3. Ongoing work in Hadoop itself for better committers. Goal: year end &
> Hadoop 3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The oode is
> all there, Parquet is a troublespot, and more testing is welcome from anyone
> who wants to help.
>
> 4. Databricks have "something"; specifics aren't covered, but I assume its
> dynamo DB based
>
>
>
>
>
> -Steve
>
>
>
>
>
>
>
>
>
>
>
>
>
> From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
> Sent: Friday, September 29, 2017 8:15 AM
> To: user@spark.apache.org
> Subject: HDFS or NFS as a cache?
>
>
>
> I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
> parquet files to S3. But the S3 performance for various reasons is bad when
> I access s3 through the parquet write method:
>
> df.write.parquet('s3a://bucket/parquet')
>
> Now I want to setup a small cache for the parquet output. One output is
> about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
> master, write the output to it and then move it to S3? Or should I setup a
> HDFS on the Master? Or should I even opt for an additional cluster running a
> HDFS solution on more than one node?
>
> thanks!
>
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: HDFS or NFS as a cache?

2017-10-02 Thread JG Perrin
Steve,

If I refer to the collect() API, it says “Running collect requires moving all 
the data into the application's driver process, and doing so on a very large 
dataset can crash the driver process with OutOfMemoryError.” So why would you 
need a distributed FS?

jg

From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Saturday, September 30, 2017 6:10 AM
To: JG Perrin 
Cc: Alexander Czech ; user@spark.apache.org
Subject: Re: HDFS or NFS as a cache?


On 29 Sep 2017, at 20:03, JG Perrin 
> wrote:

You will collect in the driver (often the master) and it will save the data, so 
for saving, you will not have to set up HDFS.

no, it doesn't work quite like that.

1. workers generate their data and save somwhere
2. on "task commit" they move their data to some location where it will be 
visible for "job commit" (rename, upload, whatever)
3. job commit —which is done in the driver,— takes all the committed task data 
and makes it visible in the destination directory.
4. Then they create a _SUCCESS file to say "done!"


This is done with Spark talking between workers and drivers to guarantee that 
only one task working on a specific part of the data commits their work, only
committing the job once all tasks have finished

The v1 mapreduce committer implements (2) by moving files under a job attempt 
dir, and (3) by moving it from the job attempt dir to the destination. one 
rename per task commit, another rename of every file on job commit. In HFDS, 
Azure wasb and other stores with an O(1) atomic rename, this isn't *too* 
expensve, though that final job commit rename still takes time to list and move 
lots of files

The v2 committer implements (2) by renaming to the destination directory and 
(3) as a no-op. Rename in the tasks then, but not not that second, serialized 
one at the end

There's no copy of data from workers to driver, instead you need a shared 
output filesystem so that the job committer can do its work alongside the tasks.

There are alternatives committer agorithms,

1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code 
(https://github.com/SparkTC/stocator/)
3. Ongoing work in Hadoop itself for better committers. Goal: year end & Hadoop 
3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The oode is all there, 
Parquet is a troublespot, and more testing is welcome from anyone who wants to 
help.
4. Databricks have "something"; specifics aren't covered, but I assume its 
dynamo DB based


-Steve







From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
Sent: Friday, September 29, 2017 8:15 AM
To: user@spark.apache.org
Subject: HDFS or NFS as a cache?

I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')
Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?
thanks!



RE: Error - Spark reading from HDFS via dataframes - Java

2017-10-02 Thread JG Perrin
@Anastasios: just a word of caution, this is Spark 1.x CSV parser, there a few 
(minor) changes for Spark 2.x, you can have a look at 
http://jgp.net/2017/10/01/loading-csv-in-spark/.

From: Anastasios Zouzias [mailto:zouz...@gmail.com]
Sent: Sunday, October 01, 2017 2:05 AM
To: Kanagha Kumar 
Cc: user @spark 
Subject: Re: Error - Spark reading from HDFS via dataframes - Java

Hi,

Set the inferschema option to true in spark-csv. you may also want to set the 
mode option. See readme below

https://github.com/databricks/spark-csv/blob/master/README.md

Best,
Anastasios

Am 01.10.2017 07:58 schrieb "Kanagha Kumar" 
>:
Hi,

I'm trying to read data from HDFS in spark as dataframes. Printing the schema, 
I see all columns are being read as strings. I'm converting it to RDDs and 
creating another dataframe by passing in the correct schema ( how the rows 
should be interpreted finally).

I'm getting the following error:

Caused by: java.lang.RuntimeException: java.lang.String is not a valid external 
type for schema of bigint


Spark read API:

Dataset hdfs_dataset = new SQLContext(spark).read().option("header", 
"false").csv("hdfs:/inputpath/*");

Dataset ds = new 
SQLContext(spark).createDataFrame(hdfs_dataset.toJavaRDD(), conversionSchema);
This is the schema to be converted to:
StructType(StructField(COL1,StringType,true),
StructField(COL2,StringType,true),
StructField(COL3,LongType,true),
StructField(COL4,StringType,true),
StructField(COL5,StringType,true),
StructField(COL6,LongType,true))

This is the original schema obtained once read API was invoked
StructType(StructField(_c1,StringType,true),
StructField(_c2,StringType,true),
StructField(_c3,StringType,true),
StructField(_c4,StringType,true),
StructField(_c5,StringType,true),
StructField(_c6,StringType,true))

My interpretation is even when a JavaRDD is cast to dataframe by passing in the 
new schema, values are not getting type casted.
This is occurring because the above read API reads data as string types from 
HDFS.

How can I  convert an RDD to dataframe by passing in the correct schema once it 
is read?
How can the values by type cast correctly during this RDD to dataframe 
conversion?

Or how can I read data from HDFS with an input schema in java?
Any suggestions are helpful. Thanks!




Example of GBTClassifier

2017-10-02 Thread mckunkel
Greetings, 
I am trying to run the example in the example directory for the
GBTClassifier. But when I view this code in eclipse, I get an error such
that
"The method setLabelCol(String) is undefined for the type GBTClassifier"
For the line

GBTClassifier gbt = new
GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
.setMaxIter(10);

However the API says this method is there, eclipse does not.
I did a straight copy paste, including all imports.

Someone please help.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PySpark - Expand rows into dataframes via function

2017-10-02 Thread Patrick McCarthy
Hello,

I'm trying to map ARIN registry files into more explicit IP ranges. They
provide a number of IPs in the range (here it's 8192) and a starting IP,
and I'm trying to map it into all the included /24 subnets. For example,

Input:

array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',

   'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)


Output:

array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
   ['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
   ['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],

...


I have the input lookup table in a pyspark DF, and a python function
to do the conversion into the mapped output. I think to produce the
full mapping I need a UDTF but this concept doesn't seem to exist in
PySpark. What's the best approach to do this mapping and recombine
into a new DataFrame?


Thanks,

Patrick


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2017-10-02 Thread Pavel Knoblokh
If your processing task inherently processes input data by month you
may want to "manually" partition the output data by month as well as
by day, that is to save it with a file name including the given month,
i.e. "dataset.parquet/month=01". Then you will be able to use the
overwrite mode with each month partition. Hope this could be of some
help.

-- 
Pavel Knoblokh

On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
> Hello,
>
> I am trying to use
> data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job to
> process a given month twice, I'll get duplicate data in all the subfolders
> for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!



-- 
Pavel Knoblokh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Should Flume integration be behind a profile?

2017-10-02 Thread Sean Owen
CCing user@
Yeah good point about perhaps moving the examples into the module itself.
Actually removing it would be a long way off, no matter what.

On Mon, Oct 2, 2017 at 8:35 AM Nick Pentreath 
wrote:

> I'd agree with #1 or #2. Deprecation now seems fine.
>
> Perhaps this should be raised on the user list also?
>
> And perhaps it makes sense to look at moving the Flume support into Apache
> Bahir if there is interest (I've cc'ed Bahir dev list here)? That way the
> current state of the connector could keep going for those users who may
> need it.
>
> As for examples, for the Kinesis connector the examples now live in the
> subproject (see e.g. KinesisWordCountASL under external/kinesis-asl). So we
> don't have to completely remove the examples, just move them (this may not
> solve the doc issue but at least the examples are still there for anyone
> who needs them).
>
> On Mon, 2 Oct 2017 at 06:36 Mridul Muralidharan  wrote:
>
>> I agree, proposal 1 sounds better among the options.
>>
>> Regards,
>> Mridul
>>
>>
>> On Sun, Oct 1, 2017 at 3:50 PM, Reynold Xin  wrote:
>> > Probably should do 1, and then it is an easier transition in 3.0.
>> >
>> > On Sun, Oct 1, 2017 at 1:28 AM Sean Owen  wrote:
>> >>
>> >> I tried and failed to do this in
>> >> https://issues.apache.org/jira/browse/SPARK-22142 because it became
>> clear
>> >> that the Flume examples would have to be removed to make this work,
>> too.
>> >> (Well, you can imagine other solutions with extra source dirs or
>> modules for
>> >> flume examples enabled by a profile, but that doesn't help the docs
>> and is
>> >> nontrivial complexity for little gain.)
>> >>
>> >> It kind of suggests Flume support should be deprecated if it's put
>> behind
>> >> a profile. Like with Kafka 0.8. (This is why I'm raising it again to
>> the
>> >> whole list.)
>> >>
>> >> Any preferences among:
>> >> 1. Put Flume behind a profile, remove examples, deprecate
>> >> 2. Put Flume behind a profile, remove examples, but don't deprecate
>> >> 3. Punt until Spark 3.0, when this integration would probably be
>> removed
>> >> entirely (?)
>> >>
>> >> On Tue, Sep 26, 2017 at 10:36 AM Sean Owen  wrote:
>> >>>
>> >>> Not a big deal, but I'm wondering whether Flume integration should at
>> >>> least be opt-in and behind a profile? it still sees some use (at
>> least on
>> >>> our end) but not applicable to the majority of users. Most other
>> third-party
>> >>> framework integrations are behind a profile, like YARN, Mesos,
>> Kinesis,
>> >>> Kafka 0.8, Docker. Just soliciting comments, not arguing for it.
>> >>>
>> >>> (Well, actually it annoys me that the Flume integration always fails
>> to
>> >>> compile in IntelliJ unless you generate the sources manually)
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>