Re: Log4j 1.2.17 spark CVE

2021-12-14 Thread Steve Loughran
log4j 1.2.17 is not vulnerable. There is an existing CVE there from a log
aggregation servlet; Cloudera products ship a patched release with that
servlet stripped...asf projects are not allowed to do that.

But: some recent Cloudera Products do include log4j 2.x, so colleagues of
mine are busy patching and retesting everything. If anyone replaces the
vulnerable jars themselves, remember to look in spark.tar.gz on hdfs to
make sure it is safe.


hadoop stayed on log4j 1.2.17 because 2.x
* would have broken all cluster management tools which configured
log4j.properties files
* wouldn't let us use System properties to can I figure logging... That is
really useful when you want to run a job with debug logging
* didn't support the no capture we use in mockito and functional tests

But: the SLF4J it's used throughout; spark doesn't need to be held back by
that choice and can use any backend you want

I don't know what we will do now; akira has just suggested logback
https://issues.apache.org/jira/browse/HADOOP-12956

had I not just broken a collar bone and so unable to code, I would have
added a new command to audit the the hadoop class path to verify it wasn't
vulnerable. Someone could do the same for spark -where you would want an
RDD where the probe would also take place in worker tasks to validate the
the cluster safety more broadly, including the tarball.

meanwhile, if your product is not exposed -probably worth mentioning on the
users mailing list so as to help people focus their attention. It's
probably best to work with everyone who produces spark based Products so
that you can have a single summary.

On Tue, 14 Dec 2021 at 01:31, Qian Sun  wrote:

> My understanding is that we don’t need to do anything. Log4j2-core not
> used in spark.
>
> > 2021年12月13日 下午12:45,Pralabh Kumar  写道:
> >
> > Hi developers,  users
> >
> > Spark is built using log4j 1.2.17 . Is there a plan to upgrade based on
> recent CVE detected ?
> >
> >
> > Regards
> > Pralabh kumar
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Missing module spark-hadoop-cloud in Maven central

2021-06-02 Thread Steve Loughran
off the record: Really irritates me too, as it forces me to do local builds
even though I shouldn't have to. Sometimes I do that for other reasons, but
still.

Getting the cloud-storage module in was hard enough at the time that I
wasn't going to push harder; I essentially stopped trying to get one in to
spark after that and effectively being told to go and play in my own fork
(*).

https://github.com/apache/spark/pull/12004#issuecomment-259020494

Given that effort almost failed, to then say "now include the artifact and
releases" wasn't something I was going to do; I had everything I needed for
my own build, and trying to add new PRs struck me as an exercise in
confrontation and futility

Sean, if I do submit a PR which makes hadoop-cloud default on the right
versions, but strips out the dependencies on the final tarball, would that
get some attention?

(*) Sean of course, was a notable exception and very supportive.







On Wed, 2 Jun 2021 at 00:56, Stephen Coy  wrote:

> I have been building Apache Spark from source just so I can get this
> dependency.
>
>
>1. git checkout v3.1.1
>2. dev/make-distribution.sh --name hadoop-cloud-3.2 --tgz -Pyarn
>-Phadoop-3.2  -Pyarn -Phadoop-cloud
>-Phive-thriftserver  -Dhadoop.version=3.2.0
>
>
> It is kind of a nuisance having to do this though.
>
> Steve C
>
>
> On 31 May 2021, at 10:34 pm, Sean Owen  wrote:
>
> I know it's not enabled by default when the binary artifacts are built,
> but not exactly sure why it's not built separately at all. It's almost a
> dependencies-only pom artifact, but there are two source files. Steve do
> you have an angle on that?
>
> On Mon, May 31, 2021 at 5:37 AM Erik Torres  wrote:
>
>> Hi,
>>
>> I'm following this documentation
>> 
>>  to
>> configure my Spark-based application to interact with Amazon S3. However, I
>> cannot find the spark-hadoop-cloud module in Maven central for the
>> non-commercial distribution of Apache Spark. From the documentation I would
>> expect that I can get this module as a Maven dependency in my project.
>> However, I ended up building the spark-hadoop-cloud module from the Spark's
>> code
>> 
>> .
>>
>> Is this the expected way to setup the integration with Amazon S3? I think
>> I'm missing something here.
>>
>> Thanks in advance!
>>
>> Erik
>>
>
> This email contains confidential information of and is the copyright of
> Infomedia. It must not be forwarded, amended or disclosed without consent
> of the sender. If you received this message by mistake, please advise the
> sender and delete all copies. Security of transmission on the internet
> cannot be guaranteed, could be infected, intercepted, or corrupted and you
> should ensure you have suitable antivirus protection in place. By sending
> us your or any third party personal details, you consent to (or confirm you
> have obtained consent from such third parties) to Infomedia’s privacy
> policy. http://www.infomedia.com.au/privacy-policy/
>


Re: java.lang.ClassNotFoundException for s3a comitter

2020-07-21 Thread Steve Loughran
On Tue, 7 Jul 2020 at 03:42, Stephen Coy 
wrote:

> Hi Steve,
>
> While I understand your point regarding the mixing of Hadoop jars, this
> does not address the java.lang.ClassNotFoundException.
>
> Prebuilt Apache Spark 3.0 builds are only available for Hadoop 2.7 or
> Hadoop 3.2. Not Hadoop 3.1.
>

sorry, I should have been clearer. Hadoop 3.2.x has everything you need.



>
> The only place that I have found that missing class is in the Spark
> “hadoop-cloud” source module, and currently the only way to get the jar
> containing it is to build it yourself. If any of the devs are listening it
>  would be nice if this was included in the standard distribution. It has a
> sizeable chunk of a repackaged Jetty embedded in it which I find a bit odd.
>
> But I am relatively new to this stuff so I could be wrong.
>
> I am currently running Spark 3.0 clusters with no HDFS. Spark is set up
> like:
>
> hadoopConfiguration.set("spark.hadoop.fs.s3a.committer.name",
> "directory");
> hadoopConfiguration.set("spark.sql.sources.commitProtocolClass",
> "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol");
> hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
> "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter");
> hadoopConfiguration.set("fs.s3a.connection.maximum",
> Integer.toString(coreCount * 2));
>
> Querying and updating s3a data sources seems to be working ok.
>
> Thanks,
>
> Steve C
>
> On 29 Jun 2020, at 10:34 pm, Steve Loughran 
> wrote:
>
> you are going to need hadoop-3.1 on your classpath, with hadoop-aws and
> the same aws-sdk it was built with (1.11.something). Mixing hadoop JARs is
> doomed. using a different aws sdk jar is a bit risky, though more recent
> upgrades have all be fairly low stress
>
> On Fri, 19 Jun 2020 at 05:39, murat migdisoglu 
> wrote:
>
>> Hi all
>> I've upgraded my test cluster to spark 3 and change my comitter to
>> directory and I still get this error.. The documentations are somehow
>> obscure on that.
>> Do I need to add a third party jar to support new comitters?
>>
>> java.lang.ClassNotFoundException:
>> org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
>>
>>
>> On Thu, Jun 18, 2020 at 1:35 AM murat migdisoglu <
>> murat.migdiso...@gmail.com> wrote:
>>
>>> Hello all,
>>> we have a hadoop cluster (using yarn) using  s3 as filesystem with
>>> s3guard is enabled.
>>> We are using hadoop 3.2.1 with spark 2.4.5.
>>>
>>> When I try to save a dataframe in parquet format, I get the following
>>> exception:
>>> java.lang.ClassNotFoundException:
>>> com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol
>>>
>>> My relevant spark configurations are as following:
>>>
>>> "hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
>>> "fs.s3a.committer.name
>>> <https://aus01.safelinks.protection.outlook.com/?url=http%3A%2F%2Ffs.s3a.committer.name%2F=02%7C01%7Cscoy%40infomedia.com.au%7C25d6f7b564dd4cb53e5508d81c28e645%7C45d5407150f849caa59f9457123dc71c%7C0%7C0%7C637290309277792405=jxbuOsgSShhHZcXjrjkZmJ4DCXIXstzRFSOaOEEadRE%3D=0>":
>>> "magic",
>>> "fs.s3a.committer.magic.enabled": true,
>>> "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
>>>
>>> While spark streaming fails with the exception above, apache beam
>>> succeeds writing parquet files.
>>> What might be the problem?
>>>
>>> Thanks in advance
>>>
>>>
>>> --
>>> "Talkers aren’t good doers. Rest assured that we’re going there to use
>>> our hands, not our tongues."
>>> W. Shakespeare
>>>
>>
>>
>> --
>> "Talkers aren’t good doers. Rest assured that we’re going there to use
>> our hands, not our tongues."
>> W. Shakespeare
>>
>
>
> <https://www.infomedia.com.au/driving-force/?utm_campaign=200630%20Email%20Signature_source=Internal_medium=Email_content=Driving%20Force>
> This email contains confidential information of and is the copyright of
> Infomedia. It must not be forwarded, amended or disclosed without consent
> of the sender. If you received this message by mistake, please advise the
> sender and delete all copies. Security of transmission on the internet
> cannot be guaranteed, could be infected, intercepted, or corrupted and you
> should ensure you have suitable antivirus protection in place. By sending
> us your or any third party personal details, you consent to (or confirm you
> have obtained consent from such third parties) to Infomedia’s privacy
> policy. http://www.infomedia.com.au/privacy-policy/
>


Re: java.lang.ClassNotFoundException for s3a comitter

2020-06-29 Thread Steve Loughran
you are going to need hadoop-3.1 on your classpath, with hadoop-aws and the
same aws-sdk it was built with (1.11.something). Mixing hadoop JARs is
doomed. using a different aws sdk jar is a bit risky, though more recent
upgrades have all be fairly low stress

On Fri, 19 Jun 2020 at 05:39, murat migdisoglu 
wrote:

> Hi all
> I've upgraded my test cluster to spark 3 and change my comitter to
> directory and I still get this error.. The documentations are somehow
> obscure on that.
> Do I need to add a third party jar to support new comitters?
>
> java.lang.ClassNotFoundException:
> org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
>
>
> On Thu, Jun 18, 2020 at 1:35 AM murat migdisoglu <
> murat.migdiso...@gmail.com> wrote:
>
>> Hello all,
>> we have a hadoop cluster (using yarn) using  s3 as filesystem with
>> s3guard is enabled.
>> We are using hadoop 3.2.1 with spark 2.4.5.
>>
>> When I try to save a dataframe in parquet format, I get the following
>> exception:
>> java.lang.ClassNotFoundException:
>> com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol
>>
>> My relevant spark configurations are as following:
>>
>> "hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
>> "fs.s3a.committer.name": "magic",
>> "fs.s3a.committer.magic.enabled": true,
>> "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
>>
>> While spark streaming fails with the exception above, apache beam
>> succeeds writing parquet files.
>> What might be the problem?
>>
>> Thanks in advance
>>
>>
>> --
>> "Talkers aren’t good doers. Rest assured that we’re going there to use
>> our hands, not our tongues."
>> W. Shakespeare
>>
>
>
> --
> "Talkers aren’t good doers. Rest assured that we’re going there to use
> our hands, not our tongues."
> W. Shakespeare
>


Re: [Spark Streaming] Spark Streaming with S3 vs Kinesis

2018-06-26 Thread Steve Loughran


On 25 Jun 2018, at 23:59, Farshid Zavareh 
mailto:fhzava...@gmail.com>> wrote:

I'm writing a Spark Streaming application where the input data is put into an 
S3 bucket in small batches (using Database Migration Service - DMS). The Spark 
application is the only consumer. I'm considering two possible architectures:

Have Spark Streaming watch an S3 prefix and pick up new objects as they come in
Stream data from S3 to a Kinesis stream (through a Lambda function triggered as 
new S3 objects are created by DMS) and use the stream as input for the Spark 
application.
While the second solution will work, the first solution is simpler. But are 
there any pitfalls? Looking at this guide, I'm concerned about two specific 
points:

> The more files under a directory, the longer it will take to scan for changes 
> — even if no files have been modified.

We will be keeping the S3 data indefinitely. So the number of objects under the 
prefix being monitored is going to increase very quickly.


Theres a slightly-more-optimised streaming source for cloud streams here

https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/org/apache/spark/streaming/hortonworks/CloudInputDStream.scala


Even so, the cost of scanning S3 is one LIST request per 5000 objects; I'll 
leave it to you to work out how many there will be in your application —and how 
much it will cost. And of course, the more LIST calls tehre are, the longer 
things take, the bigger your window needs to be.


> “Full” Filesystems such as HDFS tend to set the modification time on their 
> files as soon as the output stream is created. When a file is opened, even 
> before data has been completely written, it may be included in the DStream - 
> after which updates to the file within the same window will be ignored. That 
> is: changes may be missed, and data omitted from the stream.

I'm not sure if this applies to S3, since to my understanding objects are 
created atomically and cannot be updated afterwards as is the case with 
ordinary files (unless deleted and recreated, which I don't believe DMS does)


Objects written to S3 are't visible until the upload completes, in an atomic 
operation. You can write in place and not worry.

The timestamp on S3 artifacts comes from the PUT tim. On multipart uploads of 
many MB/many GB uploads, thats when the first post to initiate the MPU is 
kicked off. So if the upload starts in time window t1 and completed in window 
t2, the object won't be visible until t2, but the timestamp will be of t1. Bear 
that  in mind.

The lambda callback probably does have better scalability and resilience;  not 
tried it myself.





Thanks for any help!



Re: Palantir replease under org.apache.spark?

2018-01-11 Thread Steve Loughran


On 9 Jan 2018, at 18:10, Sean Owen 
> wrote:

Just to follow up -- those are actually in a Palantir repo, not Central. 
Deploying to Central would be uncourteous, but this approach is legitimate and 
how it has to work for vendors to release distros of Spark etc.


ASF processes are set up to stop people pushing any org.apache. artifact out to 
mvncentral without going through the signing process; if someone does, then 
that's a major problem,

On Tue, Jan 9, 2018 at 11:43 AM Nan Zhu 
> wrote:
Hi, all

Out of curious, I just found a bunch of Palantir release under org.apache.spark 
in maven central 
(https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11)?

Is it on purpose?

Best,

Nan





Re: Writing files to s3 with out temporary directory

2017-12-01 Thread Steve Loughran


Hadoop trunk (i.e 3.1 when it comes out), has the code to do 0-rename commits


http://steveloughran.blogspot.co.uk/2017/11/subatomic.html

if you want to play today, you can build Hadoop trunk & spark master,  + a 
little glue JAR of mine to get Parquet to play properly

http://steveloughran.blogspot.co.uk/2017/11/how-to-play-with-new-s3a-committers.html



On 21 Nov 2017, at 15:03, Jim Carroll 
> wrote:

It's not actually that tough. We already use a custom Hadoop FileSystem for
S3 because when we started using Spark with S3 the native FileSystem was
very unreliable. Our's is based on the code from Presto. (see
https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java
).

I already have a version that introduces a hash to the filename for the file
that's actually written to the S3 to see if it makes a difference per
https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html#get-workload-considerations
. FWIW, it doesn't.

AFAIK, the more the hash appears up the the directory tree, the better it is. 
The classic partitioned layout here is exactly what y don't want.


I'm going to modify that experiment to override the key
name like before except actually mode the file, keep track of the state, and
override the rename method.


you might find this intersting too  https://arxiv.org/abs/1709.01812 .

IBM's stocator FS remaps from dest/_temporary/$jobAttemp/$taskAttempt/part- 
to  a file dest/part-$jobAttempt-$taskAttempt-000

This makes it possible to cleanup failed tasks & jobs; without that on any task 
failure the entire job needs to be failed.



The problems with this approach are: 1) it's brittle because it depends on
the internal directory and file naming conventions in Hadoop and Parquet.


They do, but the actual workers have the right to generate files with different 
names than part-.$suffix , stick in summary files, etc. Even: not create 
files, which is what ORC does when there are no results for that part


2)
It will assume (as seems to be currently the case) that the 'rename' call is
done for all files from the driver.


The first step to the new committers was look at all the code where the old 
ones were called, including stepping through with a debugger to work out 
exactly what the two intermingled commit algorithms were up to

https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md

But it should do until there's a better
solution in the Hadoop committer.



If you are at the stage where you have your own FS implementation, you are 
probably ready to pick up & play with the new s3a committers.



Re: Process large JSON file without causing OOM

2017-11-15 Thread Steve Loughran


On 14 Nov 2017, at 15:32, Alec Swan 
> wrote:

 But I wonder if there is a way to stream/batch the content of JSON file in 
order to convert it to ORC piecemeal and avoid reading the whole JSON file in 
memory in the first place?




That is what you'll need to do; you'd hit similar problems if you had the same 
files, same allocated JVM space and the same # of threads trying to read in the 
files.

Jackson has a streaming API: http://www.baeldung.com/jackson-streaming-api


Re: Anyone knows how to build and spark on jdk9?

2017-10-30 Thread Steve Loughran

On 27 Oct 2017, at 19:24, Sean Owen 
> wrote:

Certainly, Scala 2.12 support precedes Java 9 support. A lot of the work is in 
place already, and the last issue is dealing with how Scala closures are now 
implemented quite different with lambdas / invokedynamic. This affects the 
ClosureCleaner. For the interested, this is as far as I know the main remaining 
issue:

Despite the odd naming, all of these versions of Java are successors to Java 9. 
Supporting any of them is probably the same thing, so, the work is still for 
now getting it working on Java 9.

Whereas Java has been very backwards-compatible in the past, the new module 
structure is almost certain to break something in Spark or its dependencies. 
Removing JAXB from the JDK alone causes issues. Getting it to run at all on 
Java 9 may require changes, whereas compatibility with new Java major releases 
in the past generally came for free. It'll be worth trying to make that happen 
soonish. I'm guessing for Spark 3.x in first half of next year?


it is going to be traumatic across the stack, but it's probably best starting 
it as a background activity, just to be aware of what's going to work and where 
the trouble is(*).

But, first things first. Scala 2.12 support.

On Fri, Oct 27, 2017 at 6:02 PM Jörn Franke 
> wrote:
Scala 2.12 is not yet supported on Spark - this means also not JDK9:
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-14220

If you look at the Oracle support then jdk 9 is anyway only supported for 6 
months. JDK 8 is Lts (5 years) JDK 18.3 will be only 6 months and JDK 18.9 is 
lts (5 years).
http://www.oracle.com/technetwork/java/eol-135779.html

I do not think Spark should support non-lts releases. Especially for JDK9 I do 
not see a strong technical need, but maybe I am overlooking something. Of 
course http2 etc would be nice for the web interfaces, but currently not very 
urgent.


Oracle's new retirement strategy is "odd": it'll essentially be killing java 0 
updates before java 8, and retiring java 8 as the same time as the march '18 
release. Like you say, not very motiviational for an update.

At the same time: Java 8 is going away, and at some point the move to the new 
versions will be needed, even if the new version isn't JDK9 itself. It's 
generally helpful to be a bit proactive, especially getting all the 
dependencies bumped up, sorting out build & test. The real enemy is any 
incompatible change needed in the code, or something which breaks public/stable 
APIs. That and some dependency on a library which is not compatible with java 9 
and which lacks a replacement. Either you take on the maintenance yourself 
(bad),  or you do the migration.




(*) I predict "Kerberos". it's always Kerberos. A move to a "per-app JRE will 
complicate enabling full length bit encryption", as the ASF isn't going to be 
able to ship the extended crypto JAR needed for Kerberos ad 256 bit keys.

-Steve


Re: Why does Spark need to set log levels

2017-10-12 Thread Steve Loughran

> On 9 Oct 2017, at 16:49, Daan Debie  wrote:
> 
> Hi all!
> 
> I would love to use Spark with a somewhat more modern logging framework than 
> Log4j 1.2. I have Logback in mind, mostly because it integrates well with 
> central logging solutions such as the ELK stack. I've read up a bit on 
> getting Spark 2.0 (that's what I'm using currently) to work with anything 
> else than Log4j 1.2, and it seems nigh impossible.
> 
> If I understood the problem correctly from the various JIRA issues I read, it 
> seems Spark needs to be able to set the log-level programmatically, which 
> slf4j doesn't support, and as such, Spark integrates with Log4j 1.2 on a deep 
> level.
> 
> My question: why would Spark want to set log levels programmatically? Why not 
> leave it to the user of Spark to provide a logging configuration that suits 
> his/her needs? That way, the offending code that integrates with Log4j 
> directly, could be removed, and Spark can start relying only on the slf4j 
> API, as any good library should.
> 
> I'm curious about the motivations of the Spark dev team on this!
> 
> Daan

AFAIK log level setting is primarily done in test code & not production: spark 
has gone to a lot of effort to be logger independent; if you look at 
org.apache.spark.internal.Logging it's explicitly probing for Log4J as the 
logger back end and only doing some config stuff (falling back to a base 
log4j.properties) file there isn't a default one.

I'd expect the dependencies to follow, with commons-logging being the general 
bridge API for anything yet to fully embrace SLF4J. Though some things will 
inevitably use java.util.logging & so you'll get stuff appearing on stderr no 
matter how hard you try, usually something important like Kerberos errors


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Quick one... AWS SDK version?

2017-10-04 Thread Steve Loughran

On 3 Oct 2017, at 21:37, JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:

Sorry Steve – I may not have been very clear: thinking about 
aws-java-sdk-z.yy.xxx.jar. To the best of my knowledge, none is bundled with 
Spark.


I know, but if you are talking to s3 via the s3a client, you will need the SDK 
version to match the hadoop-aws JAR of the same version of Hadoop your JARs 
have. Similarly, if you were using spark-kinesis, it needs to be in sync there.

From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Tuesday, October 03, 2017 2:20 PM
To: JG Perrin <jper...@lumeris.com<mailto:jper...@lumeris.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Quick one... AWS SDK version?


On 3 Oct 2017, at 02:28, JG Perrin 
<jper...@lumeris.com<mailto:jper...@lumeris.com>> wrote:

Hey Sparkians,

What version of AWS Java SDK do you use with Spark 2.2? Do you stick with the 
Hadoop 2.7.3 libs?

You generally to have to stick with the version which hadoop was built with I'm 
afraid...very brittle dependency.



Re: Quick one... AWS SDK version?

2017-10-03 Thread Steve Loughran

On 3 Oct 2017, at 02:28, JG Perrin 
> wrote:

Hey Sparkians,

What version of AWS Java SDK do you use with Spark 2.2? Do you stick with the 
Hadoop 2.7.3 libs?

You generally to have to stick with the version which hadoop was built with I'm 
afraid...very brittle dependency.


Re: how do you deal with datetime in Spark?

2017-10-03 Thread Steve Loughran

On 3 Oct 2017, at 18:43, Adaryl Wakefield 
> wrote:

I gave myself a project to start actually writing Spark programs. I’m using 
Scala and Spark 2.2.0. In my project, I had to do some grouping and filtering 
by dates. It was awful and took forever. I was trying to use dataframes and SQL 
as much as possible. I see that there are date functions in the dataframe API 
but trying to use them was frustrating. Even following code samples was a 
headache because apparently the code is different depending on which version of 
Spark you are using. I was really hoping for a rich set of date functions like 
you’d find in T-SQL but I never really found them.

Is there a best practice for dealing with dates and time in Spark? I feel like 
taking a date/time string and converting it to a date/time object and then 
manipulating data based on the various components of the timestamp object 
(hour, day, year etc.) should be a heck of a lot easier than what I’m finding 
and perhaps I’m just not looking in the right place.

You can see my work here: 
https://github.com/BobLovesData/Apache-Spark-In-24-Hours/blob/master/src/net/massstreet/hour10/BayAreaBikeAnalysis.scala


Once you've done that one, I have a few hundred MB of london bike stats if you 
wan then. Their timestamps come in as strings, but "01/01/1970" is by far the 
most popular dropoff time, which is 0 in the epoch...

9809600,0,6248,01/01/1970 00:00,0,NA,31/01/2012 19:31,365,City Road: Angel
9806201,0,6422,01/01/1970 00:00,0,NA,31/01/2012 19:32,17,Hatton Wall: Holborn
9802063,0,4096,01/01/1970 00:00,0,NA,31/01/2012 19:34,338,Wellington Street : 
Strand
9804765,0,5276,01/01/1970 00:00,0,NA,31/01/2012 19:37,93,Cloudesley Road: Angel
9806779,1970,14,31/01/2012 20:11,410,Edgware Road Station: Paddington
981,0,5810,01/01/1970 00:00,0,NA,31/01/2012 19:39,114,Park Road (Baker 
Street): Regent's Park
9803952,0,5682,01/01/1970 00:00,0,NA,31/01/2012 19:41,210,Hinde Street: 
Marylebone
9818659,0,5572,01/01/1970 00:00,0,NA,31/01/2012 19:41,87,Devonshire Square: 
Liverpool Street
9808144,0,5244,01/01/1970 00:00,0,NA,31/01/2012 19:42,374,Waterloo Station 1: 
Waterloo
9814365,0,5422,01/01/1970 00:00,0,NA,31/01/2012 19:48,15,Great Russell Street: 
Bloomsbury
9816863,0,6079,01/01/1970 00:00,0,NA,31/01/2012 19:49,258,Kensington Gore: 
Knightsbridge
9818469,0,4903,01/01/1970 00:00,0,NA,31/01/2012 19:50,341,Craven Street: Strand
9811512,0,5572,01/01/1970 00:00,0,NA,31/01/2012 19:50,298,Curlew Street: Shad 
Thames
9817931,0,708,01/01/1970 00:00,0,NA,31/01/2012 19:51,341,Craven Street: Strand
9816429,0,3210,01/01/1970 00:00,0,NA,31/01/2012 19:59,388,Southampton Street: 
Strand
9806284,0,4359,01/01/1970 00:00,0,NA,31/01/2012 20:06,335,Tavistock Street: 
Covent Garden


Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685
www.massstreet.net
www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData



Re: More instances = slower Spark job

2017-10-01 Thread Steve Loughran

On 28 Sep 2017, at 15:27, Daniel Siegmann 
> wrote:


Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text 
file? Does it use InputFormat do create multiple splits and creates 1 partition 
per split? Also, in case of S3 or NFS, how does the input split work? I 
understand for HDFS files are already pre-split so Spark can use dfs.blocksize 
to determine partitions. But how does it work other than HDFS?

S3 is similar to HDFS I think. I'm not sure off-hand how exactly it decides to 
split for the local filesystem. But it does. Maybe someone else will be able to 
explain the details.


HDFS files are split into blocks, each with a real block size and location, 
which is that created when the file was written/copied.

 If you have a 100 MB replicated on 3 machines with a block size of 64MB, you 
will have two blocks for the file: 64 and 36, with three replicas of each 
block. Blocks are placed across machines (normally, 2 hosts on one rack, 1 on 
on a different rackgives you better resilience to failures of rack 
switches). There's no attempt to colocate blocks of the same file, *except* 
that HDFS will attempt to write every block onto the host where the program 
generating the data is running. So, space permitting, if the 100MB file is 
created on host 1, then host 1 will have block-1 replica-1, and 
block-2-replica-1, with the others scattered around the cluster.

The code is actually

https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L386


Because it's fixed in HDFS, you get the block size used at creation time; 
different formats may provide their own split information independent of that 
block size though. (This also means if you different block sizes for different 
files in the set of files you process, there may be different splits for each 
file, as well as different locations.

With HDFS replication, you get the bandwidth of all the hard disks serving up 
data. With a 100 MB file split in two, if those blocks were actually saved onto 
different physical Hard disks (say SAS disks with 6 gb/s), then you have 3 x 2 
x 6 gb/s bandwidth, for a max of 24 gb/s. (of course, there's the other work 
competing for disk IO); that's maximum. If spark schedules the work on those 
machines and you have the Hadoop native libraries installed (i.e. you don't get 
told off in the logs for not having them), then the HDFS client running in the 
spark processes can talk direct to the HDFS datanode and get give a native OS 
file handle to read those blocks: there isn't even a network stack to 
interfere. If you are working with remote data, then the network slows things 
down..

The S3A client just makes things up. you can configure the settings to lie 
about block size. If you have 100MB files and want to split the work five ways, 
in that job, set

spark.hadoop.fs.s3a.block.size = 20971520

The other object stores have different options, but it's the same thing really. 
You get to choose client size what Spark is told, which is then used by the 
driver to make its decisions about which splits to give to which drivers for 
processing, the order, etc.

Unlike HDFS, the bandwidth you get off S3 for a single file is fixed, 
irrespective of how many blocks you tell the client there are. Declaring 
setting a lower block size & so allowing more workers at the data isn't going 
to guarantee more performance, you'll just be sharing the same IO rate

...though, talking to S3, a big factor in performance working with the data is 
actually cost of breaking and recreating HTTP connections, which happens a lot 
if you have seek-heavy code reading large files. And the columnar formats, ORC 
and Parquet, are seek heavy, provided they aren't gzipped. Reading these files 
has pretty awful performance until you run Hadoop 2.8+ and tell S3A that you 
are doing random IO (which kills .gz reading, use wisely)

spark.hadoop.fs.s3a.experimental.fadvise random


All this stuff and more is all in the source files —don't be afraid to look 
into it to see what's going on. I always recommend starting with the stack 
traces you get when things aren't working right. If you are using S3, that's 
all in : 
https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a

-steve




Re: More instances = slower Spark job

2017-10-01 Thread Steve Loughran

> On 28 Sep 2017, at 14:45, ayan guha  wrote:
> 
> Hi
> 
> Can you kindly explain how Spark uses parallelism for bigger (say 1GB) text 
> file? Does it use InputFormat do create multiple splits and creates 1 
> partition per split?

Yes, Input formats give you their splits, this is usually used to decide how to 
break things up, As to how that gets used: you'll have to look at the source as 
I'll only get it wrong. Key point: it's part of the information which can be 
used to partition the work, but the number of available workers is the other 
big factor.



> Also, in case of S3 or NFS, how does the input split work? I understand for 
> HDFS files are already pre-split so Spark can use dfs.blocksize to determine 
> partitions. But how does it work other than HDFS?

there's invariably a config option to allow you tell spark what blocksize to 
work with, e.g fs.s3a.block.size ., which you set in spark defaults to 
something like

spark.hadoop.fs.s3a.block.size 67108864

to set it to 64MB. 

HDFS also provides locality information: where the data is. Other filesytems 
don't do that, they usually just say "localhost", which Spark recognises as 
"anywhere"...it schedules work on different parts of a file wherever there is 
free capacity.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HDFS or NFS as a cache?

2017-09-30 Thread Steve Loughran

On 29 Sep 2017, at 20:03, JG Perrin 
> wrote:

You will collect in the driver (often the master) and it will save the data, so 
for saving, you will not have to set up HDFS.

no, it doesn't work quite like that.

1. workers generate their data and save somwhere
2. on "task commit" they move their data to some location where it will be 
visible for "job commit" (rename, upload, whatever)
3. job commit —which is done in the driver,— takes all the committed task data 
and makes it visible in the destination directory.
4. Then they create a _SUCCESS file to say "done!"


This is done with Spark talking between workers and drivers to guarantee that 
only one task working on a specific part of the data commits their work, only
committing the job once all tasks have finished

The v1 mapreduce committer implements (2) by moving files under a job attempt 
dir, and (3) by moving it from the job attempt dir to the destination. one 
rename per task commit, another rename of every file on job commit. In HFDS, 
Azure wasb and other stores with an O(1) atomic rename, this isn't *too* 
expensve, though that final job commit rename still takes time to list and move 
lots of files

The v2 committer implements (2) by renaming to the destination directory and 
(3) as a no-op. Rename in the tasks then, but not not that second, serialized 
one at the end

There's no copy of data from workers to driver, instead you need a shared 
output filesystem so that the job committer can do its work alongside the tasks.

There are alternatives committer agorithms,

1. look at Ryan Blue's talk: https://www.youtube.com/watch?v=BgHrff5yAQo
2. IBM Stocator paper (https://arxiv.org/abs/1709.01812) and code 
(https://github.com/SparkTC/stocator/)
3. Ongoing work in Hadoop itself for better committers. Goal: year end & Hadoop 
3.1 https://issues.apache.org/jira/browse/HADOOP-13786 . The oode is all there, 
Parquet is a troublespot, and more testing is welcome from anyone who wants to 
help.
4. Databricks have "something"; specifics aren't covered, but I assume its 
dynamo DB based


-Steve






From: Alexander Czech [mailto:alexander.cz...@googlemail.com]
Sent: Friday, September 29, 2017 8:15 AM
To: user@spark.apache.org
Subject: HDFS or NFS as a cache?

I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')
Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?
thanks!



Re: HDFS or NFS as a cache?

2017-09-30 Thread Steve Loughran

On 29 Sep 2017, at 15:59, Alexander Czech 
> wrote:

Yes I have identified the rename as the problem, that is why I think the extra 
bandwidth of the larger instances might not help. Also there is a consistency 
issue with S3 because of the how the rename works so that I probably lose data.

correct

rename is mimicked with a COPY + DELETE; copy is in S3 and your bandwidth 
appears to be 6-10 MB/s


On Fri, Sep 29, 2017 at 4:42 PM, Vadim Semenov 
> wrote:
How many files you produce? I believe it spends a lot of time on renaming the 
files because of the output committer.
Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they have 
10GbE and you can get good throughput for S3.

On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech 
> wrote:
I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write parquet 
files to S3. But the S3 performance for various reasons is bad when I access s3 
through the parquet write method:

df.write.parquet('s3a://bucket/parquet')

Now I want to setup a small cache for the parquet output. One output is about 
12-15 GB in size. Would it be enough to setup a NFS-directory on the master, 
write the output to it and then move it to S3? Or should I setup a HDFS on the 
Master? Or should I even opt for an additional cluster running a HDFS solution 
on more than one node?

thanks!





Re: More instances = slower Spark job

2017-09-28 Thread Steve Loughran

On 28 Sep 2017, at 09:41, Jeroen Miller 
> wrote:

Hello,

I am experiencing a disappointing performance issue with my Spark jobs
as I scale up the number of instances.

The task is trivial: I am loading large (compressed) text files from S3,
filtering out lines that do not match a regex, counting the numbers
of remaining lines and saving the resulting datasets as (compressed)
text files on S3. Nothing that a simple grep couldn't do, except that
the files are too large to be downloaded and processed locally.

On a single instance, I can process X GBs per hour. When scaling up
to 10 instances, I noticed that processing the /same/ amount of data
actually takes /longer/.

This is quite surprising as the task is really simple: I was expecting
a significant speed-up. My naive idea was that each executors would
process a fraction of the input file, count the remaining lines /locally/,
and save their part of the processed file /independently/, thus no data
shuffling would occur.

Obviously, this is not what is happening.

Can anyone shed some light on this or provide pointers to relevant
information?


Expect the bandwidth of file input to be shared across all workers trying to 
read different parts of the same file. Two workers reading a file: half the B/W 
each. HDFS avoids this by splitting files into blocks and replicating each one 
by three: max bandwidth of a file is 3 * blocks(file). For S3, if one reader 
has bandwidth B, two readers get bandwidth B/2.


There are some subtleties related to multipart upload - know that if you can do 
multipart upload/copy with an upload size M, and set the emulated block size on 
the clients to be same value then you should get better paralallism, as the 
read bandwidth is really per uploaded block


fs.s3a.block.size 64M
fs.s3a.multipart.size 64M
fs.s3a.multipart.threshold 64M


The other scale issue is that AWS throttles S3 IO per shard of the data, sends 
503 responses back to the clients, which then sleep a bit to back off. Add more 
clients, more throttling, more sleep, same in-shard bandwidth.

a single bucket can have data on >1 shard, where different parts of the tree 
are on different shards

http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html

Sadly, the normal layout of directory trees in big data sources 
(year/month/day) biases code to all try and work with the same shard.

The currently shipping s3a: client doesn't handle throttling, so that won't be 
the cause of your problem. If you are using EMR and its s3: connector, it may 
be.



Re: CSV write to S3 failing silently with partial completion

2017-09-08 Thread Steve Loughran

On 7 Sep 2017, at 18:36, Mcclintic, Abbi 
> wrote:

Thanks all – couple notes below.

Generally all our partitions are of equal size (ie on a normal day in this 
particular case I see 10 equally sized partitions of 2.8 GB). We see the 
problem with repartitioning and without – in this example we are repartitioning 
to 10 but we also see the problem without any repartitioning when the default 
partition count is 200. We know that data loss is occurring because we have a 
final quality gate that counts the number of rows and halts the process if we 
see too large of a drop.

We have one use case where the data needs to be read on a local machine after 
processing and one use case of copying to redshift. Regarding the redshift 
copy, it gets a bit complicated owing to VPC and encryption requirements so we 
haven’t looked into using the JDBC driver yet.

My understanding was that Amazon EMR does not support 
s3a,
 but it may be worth looking into.

1. No, it doesn't
2. You can't currently use s3a as a direct destination of work due to s3 not 
being consistent, not without a consistency layer on top (S3Guard, etc)

We may also try a combination of writing to HDFS combined with s3distcp.


+1




Re: How to authenticate to ADLS from within spark job on the fly

2017-08-19 Thread Steve Loughran

On 19 Aug 2017, at 02:42, Imtiaz Ahmed 
> wrote:


Hi All,

I am building a spark library which developers will use when writing their 
spark jobs to get access to data on Azure Data Lake. But the authentication 
will depend on the dataset they ask for. I need to call a rest API from within 
spark job to get credentials and authenticate to read data from ADLS. Is that 
even possible? I am new to spark.

E.g, from inside a spark job a user will say:

MyCredentials myCredentials = MyLibrary.getCredentialsForPath(userId, 
"/some/path/on/azure/datalake");

then before spark.read.json("adl://examples/src/main/resources/people.json")
I need to authenticate the user to be able to read that path using the 
credentials fetched above.

Any help is appreciated.

Thanks,
Imtiaz

The ADL filesystem supports addDelegationTokens(); allowing the caller to 
collect the delegation tokens of the current authenticated user & then pass it 
along with the request —which is exactly what spark should be doing in spark 
submit.

if you want to do it yourself, look in SparkHadoopUtils (I think; IDE is closed 
right now) & see how the tokens are picked up and then passed around 
(marshalled over the job request, unmarshalled after & picked up, with bits of 
the UserGroupInformation class doing the low level work)

Java code snippet to write to the path tokenFile:

FileSystem fs = FileSystem.get(conf);
Credentials cred = new Credentials();
Token tokens[] = fs.addDelegationTokens(renewer, cred);
cred.writeTokenStorageFile(tokenFile, conf);

you can then read that file in elsewhere, and then (somehow) get the FS to use 
those toakens

otherwise, ADL supports Oauth, so you may be able to use any Oauth libraries 
for this. hadoop-azure-dalalake pulls in okhttp for that,

 
  com.squareup.okhttp
  okhttp
  2.4.0


-Steve



Re: spark.write.csv is not able write files to specified path, but is writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

2017-08-11 Thread Steve Loughran

On 10 Aug 2017, at 09:51, Hemanth Gudela 
> wrote:

Yeah, installing HDFS in our environment is unfornutately going to take lot of 
time (approvals/planning etc). I will have to live with local FS for now.
The other option I had already tried is collect() and send everything to driver 
node. But my data volume is too huge for driver node to handle alone.

NFS cross mount.


I’m now trying to split the data into multiple datasets, then collect 
individual dataset and write it to local FS on driver node (this approach slows 
down the spark job, but I hope it works).


I doubt it. The job driver is in charge of committing work renaming data under 
_temporary into the right place. Every operation which calls write() to safe to 
an FS must have the same paths visible to all nodes in the spark cluster.

A cluster-wide filesystem of some form is mandatory, or you abandon write() and 
implement your own operations to save (partitioned) data


Thank you,
Hemanth

From: Femi Anthony >
Date: Thursday, 10 August 2017 at 11.24
To: Hemanth Gudela 
>
Cc: "user@spark.apache.org" 
>
Subject: Re: spark.write.csv is not able write files to specified path, but is 
writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

Also, why are you trying to write results locally if you're not using a 
distributed file system ? Spark is geared towards writing to a distributed file 
system. I would suggest trying to collect() so the data is sent to the master 
and then do a write if the result set isn't too big, or repartition before 
trying to write (though I suspect this won't really help). You really should 
install HDFS if that is possible.

Sent from my iPhone

On Aug 10, 2017, at 3:58 AM, Hemanth Gudela 
> wrote:
Thanks for reply Femi!

I’m writing the file like this --> 
myDataFrame.write.mode("overwrite").csv("myFilePath")
There absolutely are no errors/warnings after the write.

_SUCCESS file is created on master node, but the problem of _temporary is 
noticed only on worked nodes.

I know spark.write.csv works best with HDFS, but with the current setup I have 
in my environment, I have to deal with spark write to node’s local file system 
and not to HDFS.

Regards,
Hemanth

From: Femi Anthony >
Date: Thursday, 10 August 2017 at 10.38
To: Hemanth Gudela 
>
Cc: "user@spark.apache.org" 
>
Subject: Re: spark.write.csv is not able write files to specified path, but is 
writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

Normally the _temporary directory gets deleted as part of the cleanup when the 
write is complete and a SUCCESS file is created. I suspect that the writes are 
not properly completed. How are you specifying the write ? Any error messages 
in the logs ?

On Thu, Aug 10, 2017 at 3:17 AM, Hemanth Gudela 
> wrote:
Hi,

I’m running spark on cluster mode containing 4 nodes, and trying to write CSV 
files to node’s local path (not HDFS).
I’m spark.write.csv to write CSV files.

On master node:
spark.write.csv creates a folder with csv file name and writes many files with 
part-r-000n suffix. This is okay for me, I can merge them later.
But on worker nodes:
spark.write.csv creates a folder with csv file name and writes 
many folders and files under _temporary/0/. This is not okay for me.
Could someone please suggest me what could have been going wrong in my 
settings/how to be able to write csv files to the specified folder, and not to 
subfolders (_temporary/0/task_xxx) in worker machines.

Thank you,
Hemanth




--
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre minds." 
- Albert Einstein.



Re: SPARK Issue in Standalone cluster

2017-08-04 Thread Steve Loughran

> On 3 Aug 2017, at 19:59, Marco Mistroni  wrote:
> 
> Hello
>  my 2 cents here, hope it helps
> If you want to just to play around with Spark, i'd leave Hadoop out, it's an 
> unnecessary dependency that you dont need for just running a python script
> Instead do the following:
> - got to the root of our master / slave node. create a directory 
> /root/pyscripts 
> - place your csv file there as well as the python script
> - run the script to replicate the whole directory  across the cluster (i 
> believe it's called copy-script.sh)
> - then run your spark-submit , it will be something lke
> ./spark-submit /root/pyscripts/mysparkscripts.py  
> file:///root/pyscripts/tree_addhealth.csv 10 --master 
> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
> - in your python script, as part of your processing, write the parquet file 
> in directory /root/pyscripts 
> 

That's going to hit the commit problem discussed: only the spark driver 
executes the final commit process; the output from the other servers doesn't 
get picked up and promoted. You need a shared stpre (NFS is the easy one)


> If you have an AWS account and you are versatile with that - you need to 
> setup bucket permissions etc - , you can just
> - store your file in one of your S3 bucket
> - create an EMR cluster
> - connect to master or slave
> - run your  scritp that reads from the s3 bucket and write to the same s3 
> bucket


Aah, and now we are into the problem of implementing a safe commit protocol for 
an inconsistent filesystem

My current stance there is out-the-box S3 isn't safe to use as the direct 
output of work, Azure is. It mostly works for a small experiment, but I 
wouldn't use it in production.

Simplest: work on one machine, if you go to 2-3 for exploratory work: NFS


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SPARK Issue in Standalone cluster

2017-08-03 Thread Steve Loughran

On 2 Aug 2017, at 20:05, Gourav Sengupta 
> wrote:

Hi Steve,

I have written a sincere note of apology to everyone in a separate email. I 
sincerely request your kind forgiveness before hand if anything does sound 
impolite in my emails, in advance.

Let me first start by thanking you.

I know it looks like I formed all my opinion based on that document, but that 
is not the case at all. If you or anyone tries to execute the code that I have 
given then they will see what I mean. Code speaks louder and better than words 
for me.

So I am not saying you are wrong. I am asking verify and expecting someone will 
be able to correct  a set of understanding that a moron like me has gained 
after long hours of not having anything better to do.


SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with 
replication 2 and there is a HADOOP cluster of three nodes. All these nodes 
have SPARK workers (executors) running in them.  Both are stored in the 
following way:
-
| SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
| (worker1)   |  (worker2)|  (worker3)   |
| (master) | ||
-
| file1.csv  | | file1.csv |
-
||  file2.csv  | file2.csv |
-
| file3.csv  |  file3.csv  |   |
-





CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
HDFS replication does not store the same file in all the nodes in the cluster. 
So if I have three nodes and the replication is two then the same file will be 
stored physically in two nodes in the cluster. Does that sound right?


HDFS breaks files up into blocks (default = 128MB). If a .csv file is > 128 
then it will be broken up into blocks

file1.cvs -> [block0001, block002, block0003]

and each block will be replicated. With replication = 2 there will be two 
copies of each block, but the file itself can span > 2 hosts.


ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
If SPARK is trying to process to the records then I am expecting that WORKER2 
should not be processing file1.csv, and similary WORKER 1 should not be 
processing file2.csv and WORKER3 should not be processing file3.csv. Because in 
case WORKER2 was trying to process file1.csv then it will actually causing 
network transmission of the file unnecessarily.


Spark prefers to schedule work locally, so as to save on network traffic, but 
it schedules for execution time over waiting for workers free on the node with 
the data. IF a block is on nodes 2 and 3 but there is only a free thread on 
node 1, then node 1 gets the work

There's details on whether/how work across blocks takes place which I'm 
avoiding. For now know those formats which are "splittable" will have work 
scheduled by block. If you use Parquet/ORC/avro for your data and compress with 
snappy, it will be split. This gives you maximum performance as >1 thread can 
work on different blocks. That is, if file1 is split into three blocks, three 
worker threads can process it.


ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY THIS):
if WORKER 2 is not processing file1.csv then how does it matter whether the 
file is there or not at all in the system? Should not SPARK just ask the 
workers to process the files which are avialable in the worker nodes? In case 
both WORKER2 and WORKER3 fails and are not available then file2.csv will not be 
processed at all.


locality is best-effort, not guaranteed.


ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE EXECUTED 
(Its been pointed out that I am learning SPARK, and even I did not take more 
than 13 mins to set up the cluster and run the code).

Once you execute the code then you will find that:
1.  if the path starts with file:/// while reading back then there is no error 
reported, but the number of records reported back are only those records in the 
worker which also has the server.
2. also you will notice that once you cache the file before writing the 
partitions are ditributed nicely across the workers, and while writing back, 
the dataframe partitions does write properly to the worker node in the Master, 
but the workers in the other system have the files written in _temporary folder 
which does not get copied back to the main folder. Inspite of this the job is 
not reported as failed in SPARK.

This gets into the "commit protocol". You don't want to know all the dirty 
details (*) but essentially its this

1. Every worker writes its output to a directory under the destination 
directory, something like 
'$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID'
2. it is the spark driver which "commits" the job by moving the 

Re: PySpark Streaming S3 checkpointing

2017-08-02 Thread Steve Loughran

On 2 Aug 2017, at 10:34, Riccardo Ferrari 
> wrote:

Hi list!

I am working on a pyspark streaming job (ver 2.2.0) and I need to enable 
checkpointing. At high level my python script goes like this:

class StreamingJob():

def __init__(..):
...
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)

def doJob(self):
   ssc = StreamingContext.getOrCreate('', )

and I run it:

myJob = StreamingJob(...)
myJob.doJob()

The problem is that StreamingContext.getOrCreate is not able to have access to 
hadoop configuration configured in the constructor and fails to load from 
checkpoint with

"com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
provider in the chain"

If I export AWS credentials to the system ENV before starting the script it 
works!


Spark magically copies the env vars over for you when you launch a job


I see the Scala version has an option to provide the hadoop configuration that 
is not available in python

I don't have the whole Hadoop, just Spark, so I don't really want to configure 
hadoop's xmls and such


when you set up the context, as in spark-defaults.conf

spark.hadoop.fs.s3a.access.key=access key
spark.hadoop.fs.s3a.secret.key=secret key

Reminder: Do keep your secret key a secret, avoid checking it in to any form of 
revision control.


Re: SPARK Issue in Standalone cluster

2017-08-02 Thread Steve Loughran

On 2 Aug 2017, at 14:25, Gourav Sengupta 
> wrote:

Hi,

I am definitely sure that at this point of time everyone who has kindly cared 
to respond to my query do need to go and check this link 
https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode.

I see. Well, we shall have to edit that document to make clear something which 
had been omitted:

in order for multiple spark workers to process data, they must have a shared 
store for that data, one with read/write access for all workers. This is must 
be provided by a shared filesystem: HDFS, network-mounted NFS, Glusterfs, 
through an object store (S3, Azure WASB, ...), or through alternative 
datastores implementing the Hadoop Filesystem API (example: Apache Cassandra).

n your case, for a small cluster of 1-3 machines, especially if you are just 
learning to play with spark, I'd start with an NFS mounted disk accessible on 
the same path on all machines. If you aren't willing to set that up, stick to 
spark standalone on a single machine first. You don't need a shared cluster to 
use spark standalone.

Personally, I'd recommend downloading apache zeppelin and running it locally as 
the simplest out-the-box experience.


It does mention that SPARK standalone cluster can have multiple machines 
running as slaves.


Clearly it omits the small detail about the requirement for a shared store.

The general idea of writing to the user group is that people who know should 
answer, and not those who do not know.

Agreed, but if the answer doesn't appear to be correct to you, do consider that 
there may be some detail that hasn't been mentioned, rather than immediately 
concluding that the person replying is wrong.

-Steve





Regards,
Gourav Sengupta

On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker 
> wrote:
Gourav,
Riccardo’s answer is spot on.
What is happening is one node of spark is writing to its own directory and 
telling a slave to read the data from there, when the slave goes to read it, 
the part is not found.

Check the folder 
Users/gouravsengupta/Development/spark/sparkdata/test1/part-1-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
 on the slave.
The reason it ran on spark 1.5 may have been because the executor ran on the 
driver itself. There is not much use to a set up where you don’t have some kind 
of distributed file system, so I would encourage you to use hdfs, or a mounted 
file system shared by all nodes.

Regards,
Mahesh


From: Gourav Sengupta 
[mailto:gourav.sengu...@gmail.com]
Sent: Monday, July 31, 2017 9:54 PM
To: Riccardo Ferrari
Cc: user
Subject: Re: SPARK Issue in Standalone cluster

Hi Riccardo,

I am grateful for your kind response.

Also I am sure that your answer is completely wrong and errorneous. SPARK must 
be having a method so that different executors do not pick up the same files to 
process. You also did not answer the question why was the processing successful 
in SPARK 1.5 and not in SPARK 2.2.

Also the exact same directory is is present across in both the nodes.

I feel quite facinated when individuals respond before even understanding the 
issue, or trying out the code.

It will be of great help if someone could kindly read my email and help me 
figure out the issue.


Regards,
Gourav Sengupta



On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari 
> wrote:
Hi Gourav,

The issue here is the location where you're trying to write/read from 
:/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available to 
all executors (and driver), and that is reason why you generally use HDFS, S3, 
NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not tries 
to pick up the data from a selected node, it rather tries to write/read in 
parallel from the executor nodes. Also given its control logic there is no way 
(read. you should not care) to know what executor is doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta 
> wrote:
Hi,

I am working by creating a native SPARK standalone cluster 
(https://spark.apache.org/docs/2.2.0/spark-standalone.html)

Therefore I  do not have a HDFS.


EXERCISE:
Its the most fundamental and simple exercise. Create a sample SPARK dataframe 
and then write it to a location and then read it back.

SETTINGS:
So after I have installed SPARK in two physical systems with the same:
1. SPARK version,
2. JAVA version,
3. PYTHON_PATH
4. SPARK_HOME
5. PYSPARK_PYTHON
the user in both the systems is the root user therefore there are no permission 
issues anywhere.

I am able to start:
1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
2. 

Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-12 Thread Steve Loughran

On 10 Jul 2017, at 21:57, Everett Anderson 
<ever...@nuna.com<mailto:ever...@nuna.com>> wrote:

Hey,

Thanks for the responses, guys!

On Thu, Jul 6, 2017 at 7:08 AM, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

On 5 Jul 2017, at 14:40, Vadim Semenov 
<vadim.seme...@datadoghq.com<mailto:vadim.seme...@datadoghq.com>> wrote:

Are you sure that you use S3A?
Because EMR says that they do not support S3A

https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/
> Amazon EMR does not currently support use of the Apache Hadoop S3A file 
> system.

Oof. I figured they didn't offer technical support for S3A, but didn't know 
that there was something saying EMR does not support use of S3A. My impression 
was that many people were using it and it's the recommended S3 library in 
Hadoop 2.7+<https://wiki.apache.org/hadoop/AmazonS3> from Hadoop's point of 
view.

We're using it rather than S3N because we use encrypted buckets, and I don't 
think S3N supports picking up credentials from a machine role. Also, it was a 
bit distressing that it's unmaintained and has open bugs.

We're S3A rather than EMRFS because we have a setup where we submit work to a 
cluster via spark-submit run outside the cluster master node with --master 
yarn. When you do this, the Hadoop configuration accessible to spark-submit 
overrides that of the EMR cluster itself. If you use a configuration that uses 
EMRFS and any of the resources (like the JAR) you give to spark-submit are on 
S3, spark-submit will instantiate the EMRFS FileSystem impl, which is currently 
only available on the cluster, and fail. That said, we could work around this 
by resetting the configuration in code.


or, if you are using the URL s3:// to refer to amazon EMRs, just edit your app 
config so that fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem  and use s3:// 
everywhere (use the fs.s3a. prefix for configuring s3 though)



I think that the HEAD requests come from the `createBucketIfNotExists` in the 
AWS S3 library that checks if the bucket exists every time you do a PUT 
request, i.e. creates a HEAD request.

You can disable that by setting `fs.s3.buckets.create.enabled` to `false`
http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html
Oh, interesting. We are definitely seeing a ton of HEAD requests, which might 
be that. It looks like the `fs.s3.buckets.create.enabled` is an EMRFS option, 
though, not one common to the Hadoop S3 FileSystem implementations. Does that 
sound right?




Yeah, I'd like to see the stack traces before blaming S3A and the ASF codebase

(Sorry, to be clear -- I'm not trying to blame S3A. I figured someone else 
might've hit this and bet we had just misconfigured something or were doing 
this the wrong way.)

no worries,..if you are seeing problems, it's important to know where they are 
surfacing.



One thing I do know is that the shipping S3A client doesn't have any explicit 
handling of 503/retry events. I know that: 
https://issues.apache.org/jira/browse/HADOOP-14531

There is some retry logic in bits of the AWS SDK related to file upload: that 
may log and retry, but in all the operations listing files, getting their 
details, etc: no resilience to throttling.

If it is surfacing against s3a, there isn't anything which can immediately be 
done to fix it, other than "spread your data around more buckets". Do attach 
the stack trace you get under 
https://issues.apache.org/jira/browse/HADOOP-14381 though: I'm about half-way 
through the resilience code (& fault injection needed to test it). The more 
where I can see problems arise, the more confident I can be that those 
codepaths will be resilient.

Will do!

We did end up finding that some of our jobs were sharding data way too finely, 
ending up with 5-10k+ tiny Parquet shards per table. This happened when we 
unioned many Spark DataFrames together without doing a repartition or coalesce 
afterwards. After throwing in a repartition (to additionally balance the output 
shards) we haven't seen the error, again, but our graphs of S3 HEAD requests 
are still rather alarmingly high.



treewalking can be expensive that way; the more dirs you have, the more things 
look around.

If you are using S3A, and Hadoop 2.8+, log the toString() value of the FS after 
your submission. It'll give you a list of all the stats it collects, including 
details fo high level API calls alongside low level HTTP requests: 
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java






Re: Using Spark as a simulator

2017-07-07 Thread Steve Loughran

On 7 Jul 2017, at 08:37, Esa Heikkinen 
> wrote:


I only want to simulate very huge "network" with even millions parallel time 
syncronized actors (state machines). There are also communication between 
actors via some (key-value pairs) database. I also want the simulation should 
work in the real time.

I don't know what would be the best framework or tool for that kind of 
simulation. I think Akka would be the best and easiest to deploy ?
Or do you know better frameworks or tools ?


find someone in the sciences whose problems involve state between operations 
(oceanography, atmosphere, chemistry) and find out what they do. If they say 
"C++ and MPI", show caution, if not actually running away.

If you can get away with doing the simulation on a single machine with GPUs, 
you will probably get better performance than trying to use the big data tools, 
which are biased towards running through GB of data on storage or coming in 
from external sources in a dataflow pipeline


Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-06 Thread Steve Loughran

On 5 Jul 2017, at 14:40, Vadim Semenov 
> wrote:

Are you sure that you use S3A?
Because EMR says that they do not support S3A

https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/
> Amazon EMR does not currently support use of the Apache Hadoop S3A file 
> system.

I think that the HEAD requests come from the `createBucketIfNotExists` in the 
AWS S3 library that checks if the bucket exists every time you do a PUT 
request, i.e. creates a HEAD request.

You can disable that by setting `fs.s3.buckets.create.enabled` to `false`
http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html



Yeah, I'd like to see the stack traces before blaming S3A and the ASF codebase

One thing I do know is that the shipping S3A client doesn't have any explicit 
handling of 503/retry events. I know that: 
https://issues.apache.org/jira/browse/HADOOP-14531

There is some retry logic in bits of the AWS SDK related to file upload: that 
may log and retry, but in all the operations listing files, getting their 
details, etc: no resilience to throttling.

If it is surfacing against s3a, there isn't anything which can immediately be 
done to fix it, other than "spread your data around more buckets". Do attach 
the stack trace you get under 
https://issues.apache.org/jira/browse/HADOOP-14381 though: I'm about half-way 
through the resilience code (& fault injection needed to test it). The more 
where I can see problems arise, the more confident I can be that those 
codepaths will be resilient.


On Thu, Jun 29, 2017 at 4:56 PM, Everett Anderson 
> wrote:
Hi,

We're using Spark 2.0.2 + Hadoop 2.7.3 on AWS EMR with S3A for direct I/O 
from/to S3 from our Spark jobs. We set 
mapreduce.fileoutputcommitter.algorithm.version=2 and are using encrypted S3 
buckets.

This has been working fine for us, but perhaps as we've been running more jobs 
in parallel, we've started getting errors like

Status Code: 503, AWS Service: Amazon S3, AWS Request ID: ..., AWS Error Code: 
SlowDown, AWS Error Message: Please reduce your request rate., S3 Extended 
Request ID: ...

We enabled CloudWatch S3 request metrics for one of our buckets and I was a 
little alarmed to see spikes of over 800k S3 requests over a minute or so, with 
the bulk of them HEAD requests.

We read and write Parquet files, and most tables have around 50 shards/parts, 
though some have up to 200. I imagine there's additional parallelism when 
reading a shard in Parquet, though.

Has anyone else encountered this? How did you solve it?

I'd sure prefer to avoid copying all our data in and out of HDFS for each job, 
if possible.

Thanks!





Re: Spark querying parquet data partitioned in S3

2017-07-05 Thread Steve Loughran

> On 29 Jun 2017, at 17:44, fran  wrote:
> 
> We have got data stored in S3 partitioned by several columns. Let's say
> following this hierarchy:
> s3://bucket/data/column1=X/column2=Y/parquet-files
> 
> We run a Spark job in a EMR cluster (1 master,3 slaves) and realised the
> following:
> 
> A) - When we declare the initial dataframe to be the whole dataset (val df =
> sqlContext.read.parquet("s3://bucket/data/) then the driver splits the job
> into several tasks (259) that are performed by the executors and we believe
> the driver gets back the parquet metadata.
> 
> Question: The above takes about 25 minutes for our dataset, we believe it
> should be a lazy query (as we are not performing any actions) however it
> looks like something is happening, all the executors are reading from S3. We
> have tried mergeData=false and setting the schema explicitly via
> .schema(someSchema). Is there any way to speed this up?
> 
> B) - When we declare the initial dataframe to be scoped by the first column
> (val df = sqlContext.read.parquet("s3://bucket/data/column1=X) then it seems
> that all the work (getting the parquet metadata) is done by the driver and
> there is no job submitted to Spark. 
> 
> Question: Why does (A) send the work to executors but (B) does not?
> 
> The above is for EMR 5.5.0, Hadoop 2.7.3 and Spark 2.1.0.
> 
> 

Split calculation can be very slow against object stores, especially if the 
directory structure is deep: the treewalking done here is pretty inefficient 
against the object store.

Then there's the schema merge, which looks at the tail of every file, so has to 
do a seek() against all of them. That is something which it parallelises around 
the cluster, before your job actually gets scheduled. 

Turning that off with spark.sql.parquet.mergeSchema = false should make it go 
away, but clearly not.

Aa call to jstack against the driver will show where it is at: you'll probably 
have to start from there


I know if you are using EMR you are stuck using Amazon's own s3 ciients; if you 
were on Apache's own artifacts you could move up to Hadoop 2.8 and set the 
spark.hadoop.fs.s3a.experimental.fadvise=random option for high speed random 
access. You can also turn off job summary creation in Spark


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question on Spark code

2017-06-26 Thread Steve Loughran

On 25 Jun 2017, at 20:57, kant kodali 
> wrote:

impressive! I need to learn more about scala.

What I mean stripping away conditional check in Java is this.

static final boolean isLogInfoEnabled = false;

public void logMessage(String message) {
if(isLogInfoEnabled) {
log.info(message)
}
}

If you look at the byte code the dead if check will be removed.





Generally it's skipped in Java too now people move to SLF4J APIs, which does 
on-demand string expansion

LOG.info("network IO failure from {}  source to {}", src, 
dest, ex). That only builds the final string callis src.toString() and 
dest.toString() when needed; handling null values too. So you can skip those 
guards everywhere. But the string template is still constructed; it's not free, 
and there's some merit in maintaining the guard @ debug level, though I don't 
personally bother.

The spark one takes a closure, so it can do much more. However, you shouldn't 
do anything with side effects, or indeed, anything prone to throwing 
exceptions. Always try to write .toString() methods which are robust against 
null values, that is: valid for the entire life of an instance. Your debuggers 
will appreciate it too.



Re: Question about standalone Spark cluster reading from Kerberosed hadoop

2017-06-23 Thread Steve Loughran

On 23 Jun 2017, at 10:22, Saisai Shao 
> wrote:

Spark running with standalone cluster manager currently doesn't support 
accessing security Hadoop. Basically the problem is that standalone mode Spark 
doesn't have the facility to distribute delegation tokens.

Currently only Spark on YARN or local mode supports security Hadoop.

Thanks
Jerry


There's possibly an ugly workaround where you ssh in to every node and log in 
direct to your kdc using a keytab you pushed out...that would eliminate the 
need for anything related to hadoop tokens. After all, that's essentially what 
spark-on-yarn does when when you give it keytab.


see also:  
https://www.gitbook.com/book/steveloughran/kerberos_and_hadoop/details

On Fri, Jun 23, 2017 at 5:10 PM, Mu Kong 
> wrote:
Hi, all!

I was trying to read from a Kerberosed hadoop cluster from a standalone spark 
cluster.
Right now, I encountered some authentication issues with Kerberos:



java.io.IOException: Failed on local exception: java.io.IOException: 
org.apache.hadoop.security.AccessControlException: Client cannot authenticate 
via:[TOKEN, KERBEROS]; Host Details : local host is: ""; 
destination host is: XXX;


I checked with klist, and principle/realm is correct.
I also used hdfs command line to poke HDFS from all the nodes, and it worked.
And if I submit job using local(client) mode, the job worked fine.

I tried to put everything from hadoop/conf to spark/conf and hive/conf to 
spark/conf.
Also tried edit spark/conf/spark-env.sh to add 
SPARK_SUBMIT_OPTS/SPARK_MASTER_OPTS/SPARK_SLAVE_OPTS/HADOOP_CONF_DIR/HIVE_CONF_DIR,
 and tried to export them in .bashrc as well.

However, I'm still experiencing the same exception.

Then I read some concerning posts about problems with kerberosed hadoop, some 
post like the following one:
http://blog.stratio.com/spark-kerberos-safe-story/
, which indicates that we can not access to kerberosed hdfs using standalone 
spark cluster.

I'm using spark 2.1.1, is it still the case that we can't access kerberosed 
hdfs with 2.1.1?

Thanks!


Best regards,
Mu





Re: Using YARN w/o HDFS

2017-06-23 Thread Steve Loughran
you'll need a filesystem with

* consistency
* accessibility everywhere
* supports a binding through one of the hadoop fs connectors

NFS-style distributed filesystems work with file:// ; things like glusterfs 
need their own connectors.

you can use azure's wasb:// as a drop in replacement for HDFS in Azure.I think 
google cloud storage is similar, but haven't played with it. Ask google.

You cannot do the same for S3 except on EMR and Amazon's premium emrfs:// 
offering, which adds the consistency layer.



On 22 Jun 2017, at 00:50, Alaa Zubaidi (PDF) 
> wrote:

Hi,

Can we run Spark on YARN with out installing HDFS?
If yes, where would HADOOP_CONF_DIR point to?

Regards,

This message may contain confidential and privileged information. If it has 
been sent to you in error, please reply to advise the sender of the error and 
then immediately permanently delete it and all attachments to it from your 
systems. If you are not the intended recipient, do not read, copy, disclose or 
otherwise use this message or any attachments to it. The sender disclaims any 
liability for such unauthorized use. PLEASE NOTE that all incoming e-mails sent 
to PDF e-mail accounts will be archived and may be scanned by us and/or by 
external service providers to detect and prevent threats to our systems, 
investigate illegal or inappropriate behavior, and/or eliminate unsolicited 
promotional e-mails (“spam”). If you have any concerns about this process, 
please contact us at legal.departm...@pdf.com.



Re: SparkSQL not able to read a empty table location

2017-05-20 Thread Steve Loughran

On 20 May 2017, at 01:44, Bajpai, Amit X. -ND 
> wrote:

Hi,

I have a hive external table with the S3 location having no files (but the S3 
location directory does exists). When I am trying to use Spark SQL to count the 
number of records in the table it is throwing error saying “File s3n://data/xyz 
does not exist. null/0”.

select * from tablex limit 10

Can someone let me know how we can fix this issue.

Thanks


There isn't really a "directory" in S3, just a set of objects whose paths begin 
with a string. Try creating an empty file with an _ prefix in the directory; it 
should be ignored by Spark SQL but will cause the "directory" to come into being


Re: Spark <--> S3 flakiness

2017-05-18 Thread Steve Loughran

On 18 May 2017, at 05:29, lucas.g...@gmail.com<mailto:lucas.g...@gmail.com> 
wrote:

Steve, just to clarify:

"FWIW, if you can move up to the Hadoop 2.8 version of the S3A client it is way 
better on high-performance reads, especially if you are working with column 
data and can set the fs.s3a.experimental.fadvise=random option. "

Are you talking about the hadoop-aws lib or hadoop itself.  I see that spark is 
currently only pre-built against hadoop 2.7.

all the hadoop JARs. It's a big move, and really I'd hold off for it except in 
the special case : spark standalone on my desktop

Most of our failures are on write, the other fix I've seen advertised has been: 
"fileoutputcommitter.algorithm.version=2"

this eliminates the big rename() in job commit, renaming the work of individual 
tasks at the end of each task commit.

It doesn't do anything for problems writing data, and it still has a 
fundamental flaw: to rename everything in a "directory tree", you need to be 
able to list all objects under a path, which utterly depends on consistent 
directory listings. Amazon S3 doesn't offer that: you can create a file, then 
list the bucket *and not see the file*. Similarly, after deletion it may be 
listed, but not be there any more. Without that consistent listing, you don't 
get reliable renames, hence output.

It's possible that you may not even notice the fact that data hasn't been 
copied over.

Ryan's committer avoids this problem by using the local filesystem and HDFS 
cluster as the consistent stores, and using uncompleted S3A multipart uploads 
to eliminate the rename at the end

https://github.com/rdblue/s3committer

see also: https://www.youtube.com/watch?v=8F2Jqw5_OnI=youtu.be



Still doing some reading and will start testing in the next day or so.

Thanks!

Gary

On 17 May 2017 at 03:19, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

On 17 May 2017, at 06:00, lucas.g...@gmail.com<mailto:lucas.g...@gmail.com> 
wrote:

Steve, thanks for the reply.  Digging through all the documentation now.

Much appreciated!



FWIW, if you can move up to the Hadoop 2.8 version of the S3A client it is way 
better on high-performance reads, especially if you are working with column 
data and can set the fs.s3a.experimental.fadvise=random option.

That's in apache Hadoop 2.8, HDP 2.5+, and I suspect also the latest versions 
of CDH, even if their docs don't mention it

https://hortonworks.github.io/hdp-aws/s3-performance/
https://www.cloudera.com/documentation/enterprise/5-9-x/topics/spark_s3.html


On 16 May 2017 at 10:10, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

On 11 May 2017, at 06:07, lucas.g...@gmail.com<mailto:lucas.g...@gmail.com> 
wrote:

Hi users, we have a bunch of pyspark jobs that are using S3 for loading / 
intermediate steps and final output of parquet files.

Please don't, not without a committer specially written to work against S3 in 
the presence of failures.You are at risk of things going wrong and you not even 
noticing.

The only one that I trust to do this right now is; 
https://github.com/rdblue/s3committer


see also : https://github.com/apache/spark/blob/master/docs/cloud-integration.md



We're running into the following issues on a semi regular basis:
* These are intermittent errors, IE we have about 300 jobs that run nightly... 
And a fairly random but small-ish percentage of them fail with the following 
classes of errors.

S3 write errors

"ERROR Utils: Aborting task
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS 
Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error 
Message: Not Found, S3 Extended Request ID: BlaBlahEtc="

"Py4JJavaError: An error occurred while calling o43.parquet.
: com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code: 0, 
AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error 
Message: One or more objects could not be deleted, S3 Extended Request ID: null"


S3 Read Errors:

[Stage 1:=>   (27 + 4) / 
31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage 1.0 (TID 
11)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at 
org.apache.http.impl.io<http://org.apache.http.impl.io/>.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer

Re: s3 bucket access/read file

2017-05-17 Thread Steve Loughran

On 17 May 2017, at 00:10, jazzed 
> wrote:

How did you solve the problem with V4?


which v4 problem? Authentication?

you need to declare the explicit s3a endpoint via fs.s3a.endpoint , otherwise 
you get a generic "bad auth" message which is not a good place to start 
debugging from

full list here: https://hortonworks.github.io/hdp-aws/s3-configure/index.html





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/s3-bucket-access-read-file-tp23536p28688.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: Parquet file amazon s3a timeout

2017-05-17 Thread Steve Loughran

On 17 May 2017, at 11:13, Karin Valisova 
> wrote:

Hello!
I'm working with some parquet files saved on amazon service and loading them to 
dataframe with

Dataset df = spark.read() .parquet(parketFileLocation);

however, after some time I get the "Timeout waiting for connection from pool" 
exception. I hope I'm not mistaken, but I think that there's the limitation for 
the length of any open connection with s3a, but I have enough local memory to 
actually just load the file and close the connection.

1 version of Hadoop binaries? You should be using Hadoop 2.7.x for S3a to start 
working properly (see https://issues.apache.org/jira/browse/HADOOP-11571 for 
the list of issues)

2. If you move up to 2.7 & still see the exception, can you paste the  full 
stack trace?


Is it possible to specify some option when reading the parquet to store the 
data locally and release the connection? Or any other ideas on how to solve the 
problem?


If the problem is still there with Hadoop 2.7 binaries, then there's some 
thread pool options related to the AWS transfer manager and some other pooling 
going on, as well as the setting fs.s3a.connection.maximum to play with


http://hadoop.apache.org/docs/r2.7.3/hadoop-aws/tools/hadoop-aws/index.html


though as usual, people are always finding new corner cases to deadlock. Here I 
suspect https://issues.apache.org/jira/browse/HADOOP-13826; which is fixed in 
Hadoop 2.8+

-Steve


Re: Spark <--> S3 flakiness

2017-05-17 Thread Steve Loughran

On 17 May 2017, at 06:00, lucas.g...@gmail.com<mailto:lucas.g...@gmail.com> 
wrote:

Steve, thanks for the reply.  Digging through all the documentation now.

Much appreciated!



FWIW, if you can move up to the Hadoop 2.8 version of the S3A client it is way 
better on high-performance reads, especially if you are working with column 
data and can set the fs.s3a.experimental.fadvise=random option.

That's in apache Hadoop 2.8, HDP 2.5+, and I suspect also the latest versions 
of CDH, even if their docs don't mention it

https://hortonworks.github.io/hdp-aws/s3-performance/
https://www.cloudera.com/documentation/enterprise/5-9-x/topics/spark_s3.html


On 16 May 2017 at 10:10, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

On 11 May 2017, at 06:07, lucas.g...@gmail.com<mailto:lucas.g...@gmail.com> 
wrote:

Hi users, we have a bunch of pyspark jobs that are using S3 for loading / 
intermediate steps and final output of parquet files.

Please don't, not without a committer specially written to work against S3 in 
the presence of failures.You are at risk of things going wrong and you not even 
noticing.

The only one that I trust to do this right now is; 
https://github.com/rdblue/s3committer


see also : https://github.com/apache/spark/blob/master/docs/cloud-integration.md



We're running into the following issues on a semi regular basis:
* These are intermittent errors, IE we have about 300 jobs that run nightly... 
And a fairly random but small-ish percentage of them fail with the following 
classes of errors.

S3 write errors

"ERROR Utils: Aborting task
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS 
Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error 
Message: Not Found, S3 Extended Request ID: BlaBlahEtc="

"Py4JJavaError: An error occurred while calling o43.parquet.
: com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code: 0, 
AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error 
Message: One or more objects could not be deleted, S3 Extended Request ID: null"


S3 Read Errors:

[Stage 1:=>   (27 + 4) / 
31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage 1.0 (TID 
11)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at 
org.apache.http.impl.io<http://org.apache.http.impl.io/>.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
at 
org.apache.http.impl.io<http://org.apache.http.impl.io/>.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at 
org.apache.http.impl.io<http://org.apache.http.impl.io/>.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
at 
org.apache.http.impl.io<http://org.apache.http.impl.io/>.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
at 
org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:168)
at 
org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
at 
org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)


We have literally tons of logs we can add but it would make the email unwieldy 
big.  If it would be helpful I'll drop them in a pastebin or something.

Our config is along the lines of:

  *   spark-2.1.0-bin-hadoop2.7
  *   '--packages 
com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 
pyspark-shell'

You should have the Hadoop 2.7 JARs on your CP, as s3a on 2.6 wasn't ready to 
play with. In particular, in a close() call it reads to the end of the stream, 
which is a performance killer on large files. That stack trace you see is from 
that same phase of operation, so should go away too.

Hadoop 2.7.3 depends on Amazon SDK 1.7.4; trying to use a different one will 
probably cause link errors.
http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.7.3

Also: make sure Joda time >= 2.8.1 for Java 8

If you go up to 2.8.0, and you still see the errors, fi

Re: Spark <--> S3 flakiness

2017-05-16 Thread Steve Loughran

On 11 May 2017, at 06:07, lucas.g...@gmail.com 
wrote:

Hi users, we have a bunch of pyspark jobs that are using S3 for loading / 
intermediate steps and final output of parquet files.

Please don't, not without a committer specially written to work against S3 in 
the presence of failures.You are at risk of things going wrong and you not even 
noticing.

The only one that I trust to do this right now is; 
https://github.com/rdblue/s3committer


see also : https://github.com/apache/spark/blob/master/docs/cloud-integration.md



We're running into the following issues on a semi regular basis:
* These are intermittent errors, IE we have about 300 jobs that run nightly... 
And a fairly random but small-ish percentage of them fail with the following 
classes of errors.

S3 write errors

"ERROR Utils: Aborting task
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS 
Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error 
Message: Not Found, S3 Extended Request ID: BlaBlahEtc="

"Py4JJavaError: An error occurred while calling o43.parquet.
: com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code: 0, 
AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error 
Message: One or more objects could not be deleted, S3 Extended Request ID: null"


S3 Read Errors:

[Stage 1:=>   (27 + 4) / 
31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage 1.0 (TID 
11)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at 
org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
at 
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at 
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
at 
org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
at 
org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:168)
at 
org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
at 
org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)


We have literally tons of logs we can add but it would make the email unwieldy 
big.  If it would be helpful I'll drop them in a pastebin or something.

Our config is along the lines of:

  *   spark-2.1.0-bin-hadoop2.7
  *   '--packages 
com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 
pyspark-shell'

You should have the Hadoop 2.7 JARs on your CP, as s3a on 2.6 wasn't ready to 
play with. In particular, in a close() call it reads to the end of the stream, 
which is a performance killer on large files. That stack trace you see is from 
that same phase of operation, so should go away too.

Hadoop 2.7.3 depends on Amazon SDK 1.7.4; trying to use a different one will 
probably cause link errors.
http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.7.3

Also: make sure Joda time >= 2.8.1 for Java 8

If you go up to 2.8.0, and you still see the errors, file something against 
HADOOP in JIRA


Given the stack overflow / googling I've been doing I know we're not the only 
org with these issues but I haven't found a good set of solutions in those 
spaces yet.

Thanks!

Gary Lucas



Re: [WARN] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

2017-05-16 Thread Steve Loughran

On 10 May 2017, at 13:40, Mendelson, Assaf 
> wrote:

Hi all,
When running spark I get the following warning: [WARN] 
org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Now I know that in general it is possible to ignore this warning, however, it 
means that utilities that catch “WARN” in the log keep flagging this.
I saw many answers to handling this (e.g. 
http://stackoverflow.com/questions/30369380/hadoop-unable-to-load-native-hadoop-library-for-your-platform-error-on-docker,
 
http://stackoverflow.com/questions/19943766/hadoop-unable-to-load-native-hadoop-library-for-your-platform-warning,http://stackoverflow.com/questions/40015416/spark-unable-to-load-native-hadoop-library-for-your-platform),
 however, I am unable to solve this on my local machine.
Specifically, I can’t find any such solution for windows (i.e. when running 
developer local builds) or on a centos 7 machine with no HDFS (basically it is 
a single node machine which uses spark standalone for testing).


Log4J is your friend. I usually have (at least)

log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN

if you are working on Windows though, you do actually need the native libraries 
an winutils.exe on your path, or things won't work

Any help would be appreciated.

Thanks,
  Assaf.



Re: parquet optimal file structure - flat vs nested

2017-05-03 Thread Steve Loughran

> On 30 Apr 2017, at 09:19, Zeming Yu  wrote:
> 
> Hi,
> 
> We're building a parquet based data lake. I was under the impression that 
> flat files are more efficient than deeply nested files (say 3 or 4 levels 
> down). Is that correct?
> 
> Thanks,
> Zeming

Where's the data going to live: HDFS or an object store? If it's somewhere like 
Amazon S3 I'd be biased towards the flatter structure as how the client 
libraries mimic treewalking is pretty expensive in terms of HTTP calls, and, as 
those calls all take place during the initial, serialized, query planning 
stage, expensive. 



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: removing columns from file

2017-05-01 Thread Steve Loughran

On 28 Apr 2017, at 16:10, Anubhav Agarwal 
> wrote:

Are you using Spark's textFiles method? If so, go through this blog :-
http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219


old/dated blog post.

If you get the Hadoop 2.8 binaries on your classpath, s3a does a full directory 
tree listing if you give it a simple path like "s3a://bucket/events". The 
example in that post was using a complex wildcard which hasn't yet been speeded 
up as it's pretty hard to do it in a way which works effectively everywhere.

Having all your data in 1 dir works nicely.


Anubhav

On Mon, Apr 24, 2017 at 12:48 PM, Afshin, Bardia 
> wrote:
Hi there,

I have a process that downloads thousands of files from s3 bucket, removes a 
set of columns from it, and upload it to s3.

S3 is currently not  the bottleneck, having a Single Master Node Spark instance 
is the bottleneck. One approach is to distribute the files on multiple Spark 
Master Node workers, that will make it faster.

yes, > 1 worker and, if the work can be partitioned


Question:

1.   Is there a way to utilize master / slave node on Spark to distribute 
this downloading and processing of files – so it can say do 10 files at a time?


yes, they are called RDDs/Dataframes & Datasets


If you are doing all the processing on the spark driver, then you aren't really 
using spark much, more just processing them in Scala

To get a dataframe

val df = SparkSession.read.format("csv").load("s3a://bucket/data")

You now have a dataset on all files in the directory /data in the bucket, which 
will be partitioned how spark decides (which depends on: # of workers, 
compression format used and its splittability). Assuming you can configure the 
dataframe with the column structure, you can filter aggressively by selecting 
only those columns you want

val filteredDf = df.select("rental", "start_time")
filteredDf.save(hdfs://final/processed")

then, once you've got all the data done, copy them up to S3 via distcp

I'd recommend you start doing this with a small number of files locally, 
getting the code working, then see if you can use it with s3 as the source/dest 
of data, again, locally if you want (it's just slow), then move to in-EC2 for 
the bandwidth.

Bandwidth wise, there are some pretty major performance issues with the s3n 
connector, S3a in Hadoop 2.7+ works, with Hadoop 2.8 having a lot more 
speedupm, especially when using orc and parquet as a source, where there's a 
special "random access mode".

futrher reading
https://docs.hortonworks.com/HDPDocuments/HDCloudAWS/HDCloudAWS-1.14.1/bk_hdcloud-aws/content/s3-spark/index.html

https://docs.hortonworks.com/HDPDocuments/HDCloudAWS/HDCloudAWS-1.14.1/bk_hdcloud-aws/content/s3-performance/index.html


2.   Is there a way to scale workers with Spark downloading and processing 
files, even if they are all Single Master Node?



I think there may be some terminology confusion here. You are going to have to 
have one process which is the spark driver: either on your client machine, 
deployed somewhere in the cluster via YARN/Mesos, or running on a static 
location withing a spark standalone cluster. Everything other than the driver 
process is a work, which will do the work.





Re: Questions related to writing data to S3

2017-04-24 Thread Steve Loughran

On 23 Apr 2017, at 19:49, Richard Hanson 
> wrote:


I have a streaming job which writes data to S3. I know there are saveAs 
functions helping write data to S3. But it bundles all elements then writes out 
to S3.

use Hadoop 2.8.x binaries and the fast output stream; this will stream up data 
in blocks of 5+MB (configurable), so eleminating/reducing the upload delay in 
the close(), and saving on disk space.

however, your new object isn't going to be visible until that close() call, and 
with the FS being eventually consistent, the list operation often visibly lags 
the actual object creation (or deletions, for that matter)



So my first question - Is there any way to let saveAs functions write data 
in batch or single elements instead of whole bundle?

Right now I use S3 TransferManager to upload files in batch. The code looks 
like below (sorry I don't have code at hand)

...

val manager = // initialize TransferManager...

stream.foreachRDD { rdd =>

  val elements = rdd.collect

  manager.upload...(elemnts)

}

...


I suppose there would have problem here because TransferManager instance is at 
driver program (Now the job is working that may be because I run spark as a 
single process). And checking on the internet, seemingly it is recommended to 
use foreachPartition instead, and prevent using function that cause actions 
such as rdd.collect. So another questions: what is the best practice regarding 
to this scenario (batch upload transformed data to external storage such as 
S3)? And what functions would cause 'action' to be triggered (like data to be 
sent back to driver program)?


once you've moved to the Hadoop 2.8 s3a client, you can just use save(path) on 
the dataframe to have it all done. S3A also manages sharing the transfer 
manager across all the workers in a process...it's tricker than you think as 
you want to share the available upload bandwidth while giving some B/W to all 
threads generating output...more than one thread pool is used to handle this 
(see HADOOP-13286 for an example).

getting those Hadoop 2.8.x binaries in is a bit tricky, because of transitive 
classpath pain; the SPARK-7481 patch shows how I do it


Re: splitting a huge file

2017-04-24 Thread Steve Loughran

> On 21 Apr 2017, at 19:36, Paul Tremblay  wrote:
> 
> We are tasked with loading a big file (possibly 2TB) into a data warehouse. 
> In order to do this efficiently, we need to split the file into smaller files.
> 
> I don't believe there is a way to do this with Spark, because in order for 
> Spark to distribute the file to the worker nodes, it first has to be split 
> up, right? 

if it is in HDFS, it's already been broken up by block size and scattered 
around the filesystem, so probably split up by 128/256MB blocks, 3x replicated 
each, offering lots of places for local data.

If its in another FS, different strategies may apply, including no lo

> 
> We ended up using a single machine with a single thread to do the splitting. 
> I just want to make sure I am not missing something obvious.
> 

you don't explicitly need to split up the file if you can run different workers 
against different parts of the same file, which means you need to split it up,

This is what org.apache.hadoop.mapreduce.InputFormat.getSplits() does: you will 
need to define an input format for your data source, and provide the split 
calculation

> Thanks!
> 
> -- 
> Paul Henry Tremblay
> Attunix


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-12 Thread Steve Loughran

On 12 Apr 2017, at 17:25, Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:

Hi,

Your answer is like saying, I know how to code in assembly level language and I 
am going to build the next GUI in assembly level code and I think that there is 
a genuine functional requirement to see a color of a button in green on the 
screen.


well, I reserve the right to have incomplete knowledge, and look forward to 
improving it.

Perhaps it may be pertinent to read the first preface of a CI/ CD book and 
realize to what kind of software development disciplines is it applicable to.

the original introduction on CI was probably Fowler's Cruise Control article,
https://martinfowler.com/articles/originalContinuousIntegration.html

"The key is to automate absolutely everything and run the process so often that 
integration errors are found quickly"

Java Development with Ant, 2003, looks at Cruise Control, Anthill and Gump, 
again, with that focus on team coding and automated regression testing, both of 
unit tests, and, with things like HttpUnit, web UIs. There's no discussion of 
"Data" per-se, though databases are implicit.

Apache Gump [Sam Ruby, 2001] was designed to address a single problem "get the 
entire ASF project portfolio to build and test against the latest build of 
everything else". Lots of finger pointing there, especially when something 
foundational like Ant or Xerces did bad.

AFAIK, The earliest known in-print reference to Continuous Deployme3nt is the 
HP Labs 2002 paper, Making Web Services that Work. That introduced the concept 
with a focus on automating deployment, staging testing and treating ops 
problems as use cases for which engineers could often write tests for, and, 
perhaps, even design their applications to support. "We are exploring extending 
this model to one we term Continuous Deployment —after passing the local test 
suite, a service can be automatically deployed to a public staging server for 
stress and acceptance testing by physically remote calling parties"

At this time, the applications weren't modern "big data" apps as they didn't 
have affordable storage or the tools to schedule work over it. It wasn't that 
the people writing the books and papers looked at big data and said "not for 
us", it just wasn't on their horizons. 1TB was a lot of storage in those days, 
not a high-end SSD.

Otherwise your approach is just another line of defense in saving your job by 
applying an impertinent, incorrect, and outdated skill and tool to a problem.


please be a bit more constructive here, the ASF code of conduct encourages 
empathy and coillaboration. https://www.apache.org/foundation/policies/conduct 
. Thanks.,


Building data products is a very different discipline from that of building 
software.


Which is why we ned to consider how to take what are core methodologies for 
software and apply them, and, where appropriate, supercede them with new 
workflows, ideas, technologies. But doing so with an understanding of the 
reasoning behind today's tools and workflows. I'm really interested in how do 
we get from experimental notebook code to something usable in production, 
pushing it out, finding the dirty-data-problems before it goes live, etc, etc. 
I do think today's tools have been outgrown by the applications we now build, 
and am thinking not so much "which tools to use', but one step further, "what 
are the new tools and techniques to use?".

I look forward to whatever insight people have here.


My genuine advice to everyone in all spheres of activities will be to first 
understand the problem to solve before solving it and definitely before 
selecting the tools to solve it, otherwise you will land up with a bowl of soup 
and fork in hand and argue that CI/ CD is still applicable to building data 
products and data warehousing.


I concur

Regards,
Gourav


-Steve

On Wed, Apr 12, 2017 at 12:42 PM, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

On 11 Apr 2017, at 20:46, Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:

And once again JAVA programmers are trying to solve a data analytics and data 
warehousing problem using programming paradigms. It genuinely a pain to see 
this happen.



While I'm happy to be faulted for treating things as software processes, having 
a full automated mechanism for testing the latest code before production is 
something I'd consider foundational today. This is what "Contiunous Deployment" 
was about when it was first conceived. Does it mean you should blindly deploy 
that way? well, not if you worry about security, but having that review process 
and then a final manual "deploy" button can address that.

Cloud infras let you integrate cluster instantiation to the process; which 
helps you automate things l

Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-12 Thread Steve Loughran

On 11 Apr 2017, at 20:46, Gourav Sengupta 
> wrote:

And once again JAVA programmers are trying to solve a data analytics and data 
warehousing problem using programming paradigms. It genuinely a pain to see 
this happen.



While I'm happy to be faulted for treating things as software processes, having 
a full automated mechanism for testing the latest code before production is 
something I'd consider foundational today. This is what "Contiunous Deployment" 
was about when it was first conceived. Does it mean you should blindly deploy 
that way? well, not if you worry about security, but having that review process 
and then a final manual "deploy" button can address that.

Cloud infras let you integrate cluster instantiation to the process; which 
helps you automate things like "stage the deployment in some new VMs, run 
acceptance tests (*), then switch the load balancer over to the new cluster, 
being ready to switch back if you need. I've not tried that with streaming apps 
though; I don't know how to do it there. Boot the new cluster off checkpointed 
state requires deserialization to work, which can't be guaranteed if you are 
changing the objects which get serialized.

I'd argue then, it's not a problem which has already been solved by data 
analystics/warehousing —though if you've got pointers there, I'd be grateful. 
Always good to see work by others. Indeed, the telecoms industry have led the 
way in testing and HA deployment: if you look at Erlang you can see a system 
designed with hot upgrades in mind, the way java code "add a JAR to a web 
server" never was.

-Steve


(*) do always make sure this is the test cluster with a snapshot of test data, 
not production machines/data. There are always horror stories there.


Regards,
Gourav

On Tue, Apr 11, 2017 at 2:20 PM, Sam Elamin 
> wrote:
Hi Steve


Thanks for the detailed response, I think this problem doesn't have an industry 
standard solution as of yet and I am sure a lot of people would benefit from 
the discussion

I realise now what you are saying so thanks for clarifying, that said let me 
try and explain how we approached the problem

There are 2 problems you highlighted, the first if moving the code from SCM to 
prod, and the other is enusiring the data your code uses is correct. (using the 
latest data from prod)


"how do you get your code from SCM into production?"

We currently have our pipeline being run via airflow, we have our dags in S3, 
with regards to how we get our code from SCM to production

1) Jenkins build that builds our spark applications and runs tests
2) Once the first build is successful we trigger another build to copy the dags 
to an s3 folder

We then routinely sync this folder to the local airflow dags folder every X 
amount of mins

Re test data
" but what's your strategy for test data: that's always the troublespot."

Our application is using versioning against the data, so we expect the source 
data to be in a certain version and the output data to also be in a certain 
version

We have a test resources folder that we have following the same convention of 
versioning - this is the data that our application tests use - to ensure that 
the data is in the correct format

so for example if we have Table X with version 1 that depends on data from 
Table A and B also version 1, we run our spark application then ensure the 
transformed table X has the correct columns and row values

Then when we have a new version 2 of the source data or adding a new column in 
Table X (version 2), we generate a new version of the data and ensure the tests 
are updated

That way we ensure any new version of the data has tests against it

"I've never seen any good strategy there short of "throw it at a copy of the 
production dataset"."

I agree which is why we have a sample of the production data and version the 
schemas we expect the source and target data to look like.

If people are interested I am happy writing a blog about it in the hopes this 
helps people build more reliable pipelines


Love to see that.

Kind Regards
Sam



Re: optimising storage and ec2 instances

2017-04-11 Thread Steve Loughran

> On 11 Apr 2017, at 11:07, Zeming Yu  wrote:
> 
> Hi all,
> 
> I'm a beginner with spark, and I'm wondering if someone could provide 
> guidance on the following 2 questions I have.
> 
> Background: I have a data set growing by 6 TB p.a. I plan to use spark to 
> read in all the data, manipulate it and build a predictive model on it (say 
> GBM) I plan to store the data in S3, and use EMR to launch spark, reading in 
> data from S3.
> 
> 1. Which option is best for storing the data on S3 for the purpose of 
> analysing it in EMR spark?
> Option A: storing the 6TB file as 173 million individual text files
> Option B: zipping up the above 173 million text files as 240,000 zip files
> Option C: appending the individual text files, so have 240,000 text files p.a.
> Option D: combining the text files even further
> 

everything works best if your sources are a few tens to hundreds of MB or more 
of your data, work can be partitioned up by file. If you use more structured 
formats (avro compressed with snappy, etc), you can throw > 1 executor at work 
inside a file. Structure is handy all round, even if its just adding timestamp 
and provenance columns to each data file.

there's the HAR file format from Hadoop which can merge lots of small files 
into larger ones, allowing work to be scheduled per har file. Recommended for 
HDFS as it hates small files, on S3 you still have limits on small files 
(including throttling of HTTP requests to shards of a bucket), but they are 
less significant.

One thing to be aware is that the s3 clients spark use are very inefficient in 
listing wide directory trees, and Spark not always the best at partitioning 
work because of this. You can accidentally create a very inefficient tree 
structure like datasets/year=2017/month=5/day=10/hour=12/, with only one file 
per hour. Listing and partitioning suffers here, and while s3a on Hadoop 2.8 is 
better here, Spark hasn't yet fully adapted to those changes (use of specific 
API calls). There's also a lot more to be done in S3A to handle wildcards in 
the directory tree much more efficiently (HADOOP-13204); needs to address 
pattens like (datasets/year=201?/month=*/day=10) without treewalking and 
without fetching too much data from wildcards near the top of the tree. We need 
to avoid implementing something which works well on *my* layouts, but 
absolutely dies on other people's. As is usual in OSS, help welcome; early 
testing here as critical as coding, so as to ensure things will work with your 
file structures

-Steve


> 2. Any recommendations on the EMR set up to analyse the 6TB of data all at 
> once and build a GBM, in terms of
> 1) The type of EC2 instances I need?
> 2) The number of such instances I need?
> 3) Rough estimate of cost?
> 

no opinion there

> 
> Thanks so much,
> Zeming
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: unit testing in spark

2017-04-11 Thread Steve Loughran

(sorry sent an empty reply by accident)

Unit testing is one of the easiest ways to isolate problems in an an internal 
class, things you can get wrong. But: time spent writing unit tests is time 
*not* spent writing integration tests. Which biases me towards the integration.

What I do find is good is writing integration tests to debug things: if 
something is playing up, if you can write a unit test to replicate then not 
only can you isolate the problem, you can verify it is fixed and stays fixed. 
And as they are fast & often runnable in parallel, easy to do repetitively.

But: Tests have a maintenance cost, especially if the tests go into the 
internals, making them very brittle to change. Mocking is the real troublespot 
here. It's good to be able to simulate failures, but given the choice between 
"integration test against real code" and "something using mocks which produce 
"impossible' stack traces and, after a code rework, fail so badly you can't 
tell if it's a regression or just the tests are obsolete", I'd go for 
production, even if runs up some bills.

I really liked Lar's slides; gave me some ideas. One thing I've been exploring 
is using system metrics in testing, adding more metrics to help note what is 
happening

https://steveloughran.blogspot.co.uk/2016/04/distributed-testing-making-use-of.html

Strengths: encourages me to write metrics, can be used in in-VM tests, and 
collected from a distributed SUT integration tests, both for asserts and 
logging. Weakness1. : exposing internal state which, again, can be brittle. 2. 
in integration tests the results can vary a lot, so you can't really make 
assertions on it. Better there to collect things and use in test reports.

Which brings me to a real issue with integration tests, which isn't a fault of 
the apps or the tests, but in today's test runners: log capture and reporting 
dates from the era where we were running unit tests, so thinking about the 
reporting problems there: standard out and error for a single process, no 
standard log format so naive stream capture over structured log entries; test 
runners which don't repot much on a failure but the stack trace, or, with 
scalatest, half the stack trace (*), missing out on those of the remote 
systems. Systems which, if you are playing with cloud infra, may not be there 
when you get to analyse the test results. You are left trying to compare 9 logs 
across 3 destroyed VMs to work out why the test runner through an assertion 
failure.

This is tractable, and indeed, the Kakfa people have been advocating "use kafka 
as the collector of test results" to address it: the logs, metrics, events 
raised by the SUT., etc, and then somehow correlate them into test reports, or 
at least provide the ordering of events and state across parts of the system so 
that you can work back from a test failure. Yes, that means moving way beyond 
the usual ant-JUnit XML report everything creates, but like I said: that was 
written for a different era. It's time to move on, generating the XML report as 
one of the outputs if you want, but not the one you use for diagnosing why a 
test fails.

I'd love to see what people have been up to in that area. If anyone has 
insights there, it'd be topic for a hangout.

-Steve


(*) Scaltest opinions: 
https://steveloughran.blogspot.co.uk/2016/09/scalatest-thoughts-and-ideas.html




Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-11 Thread Steve Loughran

On 7 Apr 2017, at 18:40, Sam Elamin 
<hussam.ela...@gmail.com<mailto:hussam.ela...@gmail.com>> wrote:

Definitely agree with gourav there. I wouldn't want jenkins to run my work 
flow. Seems to me that you would only be using jenkins for its scheduling 
capabilities


Maybe I was just looking at this differenlty

Yes you can run tests but you wouldn't want it to run your orchestration of jobs

What happens if jenkijs goes down for any particular reason. How do you have 
the conversation with your stakeholders that your pipeline is not working and 
they don't have data because the build server is going through an upgrade or 
going through an upgrade



Well, I wouldn't use it as a replacement for Oozie, but I'd certainly consider 
as the pipeline for getting your code out to the cluster, so you don't have to 
explain why you just pushed out something broken

As example, here's Renault's pipeline as discussed last week in Munich 
https://flic.kr/p/Tw3Emu

However to be fair I understand what you are saying Steve if someone is in a 
place where you only have access to jenkins and have to go through hoops to 
setup:get access to new instances then engineers will do what they always do, 
find ways to game the system to get their work done



This isn't about trying to "Game the system", this is about what makes a 
replicable workflow for getting code into production, either at the press of a 
button or as part of a scheduled "we push out an update every night, rerun the 
deployment tests and then switch over to the new installation" mech.

Put differently: how do you get your code from SCM into production? Not just 
for CI, but what's your strategy for test data: that's always the troublespot. 
Random selection of rows may work, although it will skip the odd outlier 
(high-unicode char in what should be a LATIN-1 field, time set to 0, etc), and 
for work joining > 1 table, you need rows which join well. I've never seen any 
good strategy there short of "throw it at a copy of the production dataset".


-Steve






On Fri, 7 Apr 2017 at 16:17, Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:
Hi Steve,

Why would you ever do that? You are suggesting the use of a CI tool as a 
workflow and orchestration engine.

Regards,
Gourav Sengupta

On Fri, Apr 7, 2017 at 4:07 PM, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:
If you have Jenkins set up for some CI workflow, that can do scheduled builds 
and tests. Works well if you can do some build test before even submitting it 
to a remote cluster

On 7 Apr 2017, at 10:15, Sam Elamin 
<hussam.ela...@gmail.com<mailto:hussam.ela...@gmail.com>> wrote:

Hi Shyla

You have multiple options really some of which have been already listed but let 
me try and clarify

Assuming you have a spark application in a jar you have a variety of options

You have to have an existing spark cluster that is either running on EMR or 
somewhere else.

Super simple / hacky
Cron job on EC2 that calls a simple shell script that does a spart submit to a 
Spark Cluster OR create or add step to an EMR cluster

More Elegant
Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that will do 
the above step but have scheduling and potential backfilling and error 
handling(retries,alerts etc)

AWS are coming out with glue<https://aws.amazon.com/glue/> soon that does some 
Spark jobs but I do not think its available worldwide just yet

Hope I cleared things up

Regards
Sam


On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:
Hi Shyla,

why would you want to schedule a spark job in EC2 instead of EMR?

Regards,
Gourav

On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande 
<deshpandesh...@gmail.com<mailto:deshpandesh...@gmail.com>> wrote:
I want to run a spark batch job maybe hourly on AWS EC2 .  What is the easiest 
way to do this. Thanks







Re: Does Spark uses its own HDFS client?

2017-04-07 Thread Steve Loughran

On 7 Apr 2017, at 15:32, Alvaro Brandon 
> wrote:

I was going through the SparkContext.textFile() and I was wondering at that 
point does Spark communicates with HDFS. Since when you download Spark binaries 
you also specify the Hadoop version you will use, I'm guessing it has its own 
client that calls HDFS wherever you specify it in the configuration files.



it uses the hadoop-hdfs JAR in spark-assembly JAR or the lib dir under 
SPARK_HOME. Nobody would ever want to do their own HDFS client, not if you look 
at the bit of the code related to kerberos. webhdfs://, that you 
could, though it's not done here.


The goal is to instrument and log all the calls that Spark does to HDFS. Which 
class or classes perform these operations?



org.apache.hadoop.hdfs.DistributedFileSystem

Take a look at HTrace here: 
https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/Tracing.html






Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-07 Thread Steve Loughran
If you have Jenkins set up for some CI workflow, that can do scheduled builds 
and tests. Works well if you can do some build test before even submitting it 
to a remote cluster

On 7 Apr 2017, at 10:15, Sam Elamin 
> wrote:

Hi Shyla

You have multiple options really some of which have been already listed but let 
me try and clarify

Assuming you have a spark application in a jar you have a variety of options

You have to have an existing spark cluster that is either running on EMR or 
somewhere else.

Super simple / hacky
Cron job on EC2 that calls a simple shell script that does a spart submit to a 
Spark Cluster OR create or add step to an EMR cluster

More Elegant
Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that will do 
the above step but have scheduling and potential backfilling and error 
handling(retries,alerts etc)

AWS are coming out with glue soon that does some 
Spark jobs but I do not think its available worldwide just yet

Hope I cleared things up

Regards
Sam


On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta 
> wrote:
Hi Shyla,

why would you want to schedule a spark job in EC2 instead of EMR?

Regards,
Gourav

On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande 
> wrote:
I want to run a spark batch job maybe hourly on AWS EC2 .  What is the easiest 
way to do this. Thanks





Re: httpclient conflict in spark

2017-03-30 Thread Steve Loughran

On 29 Mar 2017, at 14:42, Arvind Kandaswamy 
> wrote:

Hello,

I am getting the following error. I get this error when trying to use AWS S3. 
This appears to be a conflict with httpclient. AWS S3 comes with 
httplient-4.5.2.jar. I am not sure how to force spark to use this version. I 
have tried spark.driver.userClassPathFirst = true, 
spark.executor.userClassPathFirst=true. Did not help. I am using Zeppelin to 
call the spark engine in case if that is an issue.


Spark 2.x ships with httpclient 4.5.2; there is no conflict there. if you are 
on 1.6, you could actually try bumping the httplient and httpcomponents  to be 
consistent


4.5.2
4.4.4

its important to have org.apache.httpcomponents / httpcore in sync with 
httpclient; it's probably there where your problem is arising

Is there anything else that I can try?

java.lang.NoSuchMethodError: 
org.apache.http.conn.ssl.SSLConnectionSocketFactory.(Ljavax/net/ssl/SSLContext;Ljavax/net/ssl/HostnameVerifier;)V
at 
com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.(SdkTLSSocketFactory.java:56)
at 
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.getPreferredSocketFactory(ApacheConnectionManagerFactory.java:92)
at 
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:65)
at 
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:58)
at 
com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:51)
at 
com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:39)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:314)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:298)
at com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:165)
at com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:583)
at com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:563)
at com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:541)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)



Re: Spark and continuous integration

2017-03-14 Thread Steve Loughran

On 13 Mar 2017, at 13:24, Sam Elamin 
> wrote:

Hi Jorn

Thanks for the prompt reply, really we have 2 main concerns with CD, ensuring 
tests pasts and linting on the code.

I'd add "providing diagnostics when tests fail", which is a combination of: 
tests providing useful information and CI tooling collecting all those results 
and presenting them meaningfully. The hard parts are invariably (at least for 
me)

-what to do about the intermittent failures
-tradeoff between thorough testing and fast testing, especially when thorough 
means "better/larger datasets"

You can consider the output of jenkins & tests as data sources for your own 
analysis too: track failure rates over time, test runs over time, etc: could be 
interesting. If you want to go there, then the question of "which CI toolings 
produce the most interesting machine-parseable results, above and beyond the 
classic Ant-originated XML test run reports"

I have mixed feelings about scalatest there: I think the expression language is 
good, but the maven test runner doesn't report that well, at least for me:

https://steveloughran.blogspot.co.uk/2016/09/scalatest-thoughts-and-ideas.html



I think all platforms should handle this with ease, I was just wondering what 
people are using.

Jenkins seems to have the best spark plugins so we are investigating that as 
well as a variety of other hosted CI tools

Happy to write a blog post detailing our findings and sharing it here if people 
are interested


Regards
Sam

On Mon, Mar 13, 2017 at 1:18 PM, Jörn Franke 
> wrote:
Hi,

Jenkins also now supports pipeline as code and multibranch pipelines. thus you 
are not so dependent on the UI and you do not need anymore a long list of jobs 
for different branches. Additionally it has a new UI (beta) called blueocean, 
which is a little bit nicer. You may also check GoCD. Aside from this you have 
a huge variety of commercial tools, e.g. Bamboo.
In the cloud, I use for my open source github projects Travis-Ci, but there are 
also a lot of alternatives, e.g. Distelli.

It really depends what you expect, e.g. If you want to Version the build 
pipeline in GIT, if you need Docker deployment etc. I am not sure if new 
starters should be responsible for the build pipeline, thus I am not sure that 
i understand  your concern in this area.

From my experience, integration tests for Spark can be run on any of these 
platforms.

Best regards

> On 13 Mar 2017, at 10:55, Sam Elamin 
> > wrote:
>
> Hi Folks
>
> This is more of a general question. What's everyone using for their CI /CD 
> when it comes to spark
>
> We are using Pyspark but potentially looking to make to spark scala and Sbt 
> in the future
>
>
> One of the suggestions was jenkins but I know the UI isn't great for new 
> starters so I'd rather avoid it. I've used team city but that was more 
> focused on dot net development
>
>
> What are people using?
>
> Kind Regards
> Sam




Re: Wrong runtime type when using newAPIHadoopFile in Java

2017-03-06 Thread Steve Loughran

On 6 Mar 2017, at 12:30, Nira Amit 
> wrote:

 And it's very difficult if it's doing unexpected things.

All serialisations do unexpected things. Nobody understands them. Sorry



Re: Custom log4j.properties on AWS EMR

2017-02-26 Thread Steve Loughran
try giving a resource of a file in the JAR, e.g add a file 
"log4j-debugging.properties into the jar, and give a config option of 
-Dlog4j.configuration=/log4j-debugging.properties   (maybe also try without the 
"/")


On 26 Feb 2017, at 16:31, Prithish 
> wrote:

Hoping someone can answer this.

I am unable to override and use a Custom log4j.properties on Amazon EMR. I am 
running Spark on EMR (Yarn) and have tried all the below combinations in the 
Spark-Submit to try and use the custom log4j.

In Client mode
--driver-java-options 
"-Dlog4j.configuration=hdfs://host:port/user/hadoop/log4j.properties"

In Cluster mode
--conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=hdfs://host:port/user/hadoop/log4j.properties"

I have also tried picking from local filesystem using file: instead of 
hdfs. None of this seem to work. However, I can get this working when running 
on my local Yarn setup.

Any ideas?

I have also posted on Stackoverflow (link below)
http://stackoverflow.com/questions/42452622/custom-log4j-properties-on-aws-emr



Re: Get S3 Parquet File

2017-02-25 Thread Steve Loughran

On 24 Feb 2017, at 07:47, Femi Anthony 
> wrote:

Have you tried reading using s3n which is a slightly older protocol ? I'm not 
sure how compatible s3a is with older versions of Spark.

I would absolutely not use s3n with a 1.2 GB file.

There is a WONTFIX JIRA on how it will read to the end of a file when you close 
a stream, and as seek() closes a stream every seek will read to the end of a 
file. And as readFully(position, bytes) does a seek either end, every time the 
Parquet code tries to read a bit of data, 1.3 GV of download: 
https://issues.apache.org/jira/browse/HADOOP-12376

That is not going to be fixed, ever. Because it can only be done by upgrading 
the libraries, and that will simply move new bugs in, lead to different 
bugreports, etc, etc. All for a piece of code which has be supplanted in the 
hadoop-2.7.x JARs with s3a ready for use, and in the forthcoming hadoop-2.8+ 
code, significantly faster for IO (especially ORC/Parquet), multi-GB upload, 
and even the basic metadata operations used when setting up queries.

For Hadoop 2.7+, use S3a. Any issues with s3n will be closed as  "use s3a"




Femi

On Fri, Feb 24, 2017 at 2:18 AM, Benjamin Kim 
> wrote:
Hi Gourav,

My answers are below.

Cheers,
Ben


On Feb 23, 2017, at 10:57 PM, Gourav Sengupta 
> wrote:

Can I ask where are you running your CDH? Is it on premise or have you created 
a cluster for yourself in AWS? Our cluster in on premise in our data center.


you need to set  up your s3a credentials in core-site, spark-defaults, or rely 
on spark-submit picking up the submitters AWS env vars a propagating them.


Also I have really never seen use s3a before, that was used way long before 
when writing s3 files took a long time, but I think that you are reading it.

Anyideas why you are not migrating to Spark 2.1, besides speed, there are lots 
of apis which are new and the existing ones are being deprecated. Therefore 
there is a very high chance that you are already working on code which is being 
deprecated by the SPARK community right now. We use CDH and upgrade with 
whatever Spark version they include, which is 1.6.0. We are waiting for the 
move to Spark 2.0/2.1.

this is in the hadoop codebase, not the spark release. it will be the same 
irrsepectivel


And besides that would you not want to work on a platform which is at least 10 
times faster What would that be?

Regards,
Gourav Sengupta

On Thu, Feb 23, 2017 at 6:23 PM, Benjamin Kim 
> wrote:
We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
file from AWS S3. We can read the schema and show some data when the file is 
loaded into a DataFrame, but when we try to do some operations, such as count, 
we get this error below.

com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
credentials from any provider in the chain
at 
com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
at 
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
at 
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
at 
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
at 
org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at 

Re: Will Spark ever run the same task at the same time

2017-02-20 Thread Steve Loughran

> On 16 Feb 2017, at 18:34, Ji Yan  wrote:
> 
> Dear spark users,
> 
> Is there any mechanism in Spark that does not guarantee the idempotent 
> nature? For example, for stranglers, the framework might start another task 
> assuming the strangler is slow while the strangler is still running. This 
> would be annoying sometime when say the task is writing to a file, but have 
> the same tasks running at the same time may corrupt the file. From the 
> documentation page, I know that Spark's speculative execution mode is turned 
> off by default. Does anyone know any other mechanism in Spark that may cause 
> problem in scenario like this?

 It's not so much "Two tasks writing to the same file' as "two tasks writing to 
different places with the work renamed into place at the end"

speculation is the key case when there's >1  writer, though they do write to 
different directories; the spark commit protocol guarantees that only the 
committed task gets its work into the final output.

Some failure modes *may* have >1 executor running the same work, right up to 
the point where the task commit operation is started. More specifically, a 
network partition may cause the executor to lose touch with the driver, and the 
driver to pass the same task on to another executor, while the existing 
executor keeps going. Its when that first executor tries to commit the data 
that you get a guarantee that the work doesn't get committed (no connectivity 
=> no commit, connectivity resumed => driver will tell executor it's been 
aborted).

If you are working with files outside of the tasks' working directory, then the 
outcome of failure will be "undefined". The FileCommitProtocol lets you  ask 
for a temp file which is rename()d to the destination in the commit. Use this 
and the files will only appear the task is committed. Even there there is a 
small, but non-zero chance that the commit may fail partway through, in which 
case the outcome is, as they say, "undefined". Avoid that today by not manually 
adding custom partitions to data sources in your hive metastore. 

Steve




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: fault tolerant dataframe write with overwrite

2017-02-14 Thread Steve Loughran

On 14 Feb 2017, at 11:12, Mendelson, Assaf 
> wrote:

I know how to get the filesystem, the problem is that this means using Hadoop 
directly so if in the future we change to something else (e.g. S3) I would need 
to rewrite the code.

well, no, because the s3 and hfs clients use the same API

FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)

vs

FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)

same for wasb://  (which, being consistent and with fast atomic rename, can be 
used instead of HDFS), other cluster filesystems. If it's a native fs, then 
file:// should work everywhere, or some derivative (as redhat do with gluster)

This also relate to finding the last iteration, I would need to use Hadoop 
filesystem which is not agnostic to the deployment.


see above. if you are using a spark cluster of size > 1 you will need some 
distributed filesystem, which is going to have to provide a

If there is an issue here, it is that if you rely on FileSystem.rename() being 
an atomic O(1) operation then you are going to be disappointed on S3, as its a 
non-atomic O(data) copy & delete whose failure state is "undefined".


The solution here comes from having specific commiter logic for the different 
object stores. You really, really don' t want to go there. If you do, have a 
start by looking at the S3guard WiP one: 
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

further reading: 
http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores

Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea 
is that every X iterations I want to save to disk so that if something crashes 
I do not have to begin from the first iteration but just from the relevant 
iteration.


sounds like you don't really want the output to always be the FS, more 
checkpointing iterations. Couldn't you do something like every 20 iterations, 
write() the relevant RDD to the DFS


Basically I would have liked to see something like saving normally and the 
original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I 
assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have 
as the directory the current timestamp and at the end you simply select the 
directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space 
overhead.

Aside from that, can you elaborate on the use case why you need to write every 
iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf 
> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven’t fully tested the process 
there), however, checkpointing converts the result to RDD which both takes more 
time and more space.
I was wondering if there is any efficient method of managing this from inside 
spark.
Thanks,
Assaf.



Re: How to measure IO time in Spark over S3

2017-02-13 Thread Steve Loughran
Hadoop 2.8's s3a does a lot more metrics here, most of which you can find on 
HDP-2.5 if you can grab those JARs. Everything comes out as hadoop JMX metrics, 
also readable & aggregatable through a call to FileSystem.getStorageStatistics


Measuring IO time isn't something picked up, because it's actually hard to 
measure in a multihreaded world: you can't just count up the time seconds spent 
talking to s3, because it can actually make things seem unduly negative. I know 
that as we did try adding up upload times & bytes uploaded to give a metric of 
bandwidth, but it turns out to be fairly misleading. If 10 threads each took 
60s to upload a megabyte of data you could conclude that B/W is 1 MB every 10 
minutes...if they were running in parallel it's a B/W of 10MB per minute: 10x 
as fast.


FWIW the main bottlenecks in s3a perf are


1. time for metadata operations. This is shockingly bad: 
http://steveloughran.blogspot.co.uk/2016/12/how-long-does-filesystemexists-take.html
  As well as code improvements up the stack, you can help here by not having 
deep directory structures for partitioning; prefer wider trees.

2.  cost of re-opening HTTPS connection after forward/backward seek. Hadoop 2.8 
s3a does a lot of work here to improve things, through forward seeks of many KB 
before abort/restart the connectin, and an fadvise=random option for max perf 
on column table storage (ORC, Parquet)

3. how s3a waits until close() before uploading data. The 
fs.s3a.fast.output.enabled=true option boosts this, but it's pretty brittle in 
Hadoop 2.7.x as it uses lots of on-heap storage if code is generating faster 
than upload B/W; 2.8 can use HDD as buffering.

4.  time to commit data. This is an O(data) copy server side, at 6-10 MB/s. 
Needs a committer which doesn't do renames. The 1.6 DirectOutputCommitter did, 
but it couldn't handle failure & retry. Future ones will.


-Steve



From: Gili Nachum 
Sent: 13 February 2017 06:55
To: user@spark.apache.org
Subject: How to measure IO time in Spark over S3

Hi!

How can I tell IO duration for a Spark application doing R/W from S3 (using S3 
as a filesystem sc.textFile("s3a://...")?
I would like to know the % of time doing IO of the overall app execution time.

Gili.


Re: using an alternative slf4j implementation

2017-02-06 Thread Steve Loughran

> On 6 Feb 2017, at 11:06, Mendelson, Assaf  wrote:
> 
> Found some questions (without answers) and I found some jira 
> (https://issues.apache.org/jira/browse/SPARK-4147 and 
> https://issues.apache.org/jira/browse/SPARK-14703), however they do not solve 
> the issue.
> Nominally, a library should not explicitly set a binding, however spark, does 
> so (I imagine this is so spark-submit can package everything and have a 
> logger). If a dependency does this, the nominal solution would be to exclude 
> the binding (and maybe add a relevant bridge), however, since spark adds the 
> relevant jars in spark-submit this I don't see how to do this.
> 
> Is there any way of forcing logback as the binding?
> 

What happens if you just take the log4j stuff out of SPARK_HOME/lib and drop in 
the binding you want?

> -Original Message-
> From: Jacek Laskowski [mailto:ja...@japila.pl] 
> Sent: Monday, February 06, 2017 10:46 AM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: using an alternative slf4j implementation
> 
> Hi,
> 
> Sounds like a quite involved development for me. I can't help here.
> I'd suggest going through the dev and user mailing lists for the past year 
> and JIRA issues regarding the issue as I vaguely remember some discussions 
> about logging in Spark (that would merit to do the migration to logback 
> eventually).
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Mon, Feb 6, 2017 at 9:06 AM, Mendelson, Assaf  
> wrote:
>> Shading doesn’t help (we already shaded everything).
>> 
>> According to https://www.slf4j.org/codes.html#multiple_bindings only 
>> one binding can be used. The problem is that once we link to spark 
>> jars then we automatically inherit spark’s binding (for log4j).
>> 
>> I would like to find a way to either send spark’s logs to log4j and my 
>> logs to logback or send everything to logback.
>> 
>> Assaf.
>> 
>> 
>> 
>> From: Jacek Laskowski [mailto:ja...@japila.pl]
>> Sent: Monday, February 06, 2017 12:47 AM
>> To: Mendelson, Assaf
>> Cc: user
>> Subject: Re: using an alternative slf4j implementation
>> 
>> 
>> 
>> Hi,
>> 
>> 
>> 
>> Shading conflicting dependencies?
>> 
>> 
>> 
>> Jacek
>> 
>> 
>> 
>> On 5 Feb 2017 3:56 p.m., "Mendelson, Assaf"  wrote:
>> 
>> Hi,
>> 
>> Spark seems to explicitly use log4j.
>> 
>> This means that if I use an alternative backend for my application (e.g.
>> ch.qos.logback) I have a conflict.
>> 
>> Sure I can exclude logback but that means my application cannot use 
>> our internal tools.
>> 
>> 
>> 
>> Is there a way to use logback as a backend logging while using spark?
>> 
>> Assaf.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark 2.02 error when writing to s3

2017-01-28 Thread Steve Loughran

On 27 Jan 2017, at 23:17, VND Tremblay, Paul 
> wrote:

Not sure what you mean by "a consistency layer on top." Any explanation would 
be greatly appreciated!

Paul



netflix's s3mper: https://github.com/Netflix/s3mper

EMR consistency: 
http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html

AWS S3: s3guard (Wip) : https://issues.apache.org/jira/browse/HADOOP-13345

All of these do the same thing: use amazon DynamoDB for storing all the 
metadata, guaranteeing that every client gets a consistent view of deletes, 
adds, and the listings returned match the state of the system. otherwise list 
commands tend to lag changes, meaning deleted files are still mistakenly 
considered as being there, and lists of paths can miss out newly created files. 
That means that there's no guarantee that the commit-by-rename protocol used in 
Hadoop FileOutputFormat may miss out files to rename, so lose results.

S3guard will guarantee that listing is consistent, and will be a precursor to 
the 0-rename committer I'm working on, which needs that consistent list to find 
the .pending files listing outstanding operations to commit.




Re: spark 2.02 error when writing to s3

2017-01-27 Thread Steve Loughran
OK

Nobody should be committing output directly to S3 without having something add 
a consistency layer on top, not if you want reliabie (as in "doesn't 
lose/corrupt data" reliable) work

On 26 Jan 2017, at 19:09, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:

This seems to have done the trick, although I am not positive. If I have time, 
I'll test spinning up a cluster with and without consistent view to pin point 
the error.

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Neil Jonkers [mailto:neilod...@gmail.com]
Sent: Friday, January 20, 2017 11:39 AM
To: Steve Loughran; VND Tremblay, Paul
Cc: Takeshi Yamamuro; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 2.02 error when writing to s3

Can you test by enabling emrfs consistent view and use s3:// uri.

http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html

 Original message ----
From: Steve Loughran
Date:20/01/2017 21:17 (GMT+02:00)
To: "VND Tremblay, Paul"
Cc: Takeshi Yamamuro ,user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 2.02 error when writing to s3

AWS S3 is eventually consistent: even after something is deleted, a LIST/GET 
call may show it. You may be seeing that effect; even after the DELETE has got 
rid of the files, a listing sees something there, And I suspect the time it 
takes for the listing to "go away" will depend on the total number of entries 
underneath, as there are more deletion markers "tombstones" to propagate around 
s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_


From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv



My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


__

Re: spark 2.02 error when writing to s3

2017-01-20 Thread Steve Loughran
AWS S3 is eventually consistent: even after something is deleted, a LIST/GET 
call may show it. You may be seeing that effect; even after the DELETE has got 
rid of the files, a listing sees something there, And I suspect the time it 
takes for the listing to "go away" will depend on the total number of entries 
underneath, as there are more deletion markers "tombstones" to propagate around 
s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul 
> wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv



My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


_

Paul Tremblay
Analytics Specialist

THE BOSTON CONSULTING GROUP
STL ▪

Tel. + ▪ Mobile +
tremblay.p...@bcg.com
_

Read BCG's latest insights, analysis, and viewpoints at 
bcgperspectives.com



The Boston Consulting Group, Inc.

This e-mail message may contain confidential and/or privileged information. If 
you are not an addressee or otherwise authorized to receive this message, you 
should not use, copy, disclose or take any action based on this e-mail or any 
information contained in the message. If you have received this material in 
error, please advise the sender immediately by reply e-mail and delete this 
message. Thank you.



--
---
Takeshi Yamamuro



Re: "Unable to load native-hadoop library for your platform" while running Spark jobs

2017-01-20 Thread Steve Loughran

On 19 Jan 2017, at 10:59, Sean Owen 
> wrote:

It's a message from Hadoop libs, not Spark. It can be safely ignored. It's just 
saying you haven't installed the additional (non-Apache-licensed) native libs 
that can accelerate some operations. This is something you can easily have read 
more about online.


I'd worry about in production as it means that higher performance code for 
compression and HDFS encryption at rest (if used) aren' there, and it will fall 
back to the java libraries. Which aren't as bad as you'd think, at least for 
decompression, but they do tend underperform

for dev/test/local work, you can turn off the warning entirely

log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR



On Thu, Jan 19, 2017 at 10:57 AM Md. Rezaul Karim 
> wrote:
Hi All,

I'm the getting the following WARNING while running Spark jobs  in standalone 
mode:
Unable to load native-hadoop library for your platform... using builtin-java 
classes where applicable

Please note that I have configured the native path and the other ENV variables 
as follows:
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_COMMON_LIB_NATIVE_DIR=/usr/local/hadoop/lib/native
export LD_LIBRARY_PATH=/usr/local/hadoop/lib/native/:$LD_LIBRARY_PATH
export JAVA_LIBRARY_PATH=/usr/local/hadoop/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"


Although my Spark job executes successfully and writes the results to a file at 
the end. However, I am not getting any logs to track the progress.

Could someone help me to solve this problem?




Regards,
_
Md. Rezaul Karim, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html



Re: Anyone has any experience using spark in the banking industry?

2017-01-20 Thread Steve Loughran

> On 18 Jan 2017, at 21:50, kant kodali  wrote:
> 
> Anyone has any experience using spark in the banking industry? I have couple 
> of questions.

> 2. How can I make spark cluster highly available across multi datacenter? Any 
> pointers?


That's not, AFAIK, been a design goal. The communications and scheduling for 
spark assume that (a) there's low latency between executors and driver, and (b) 
that data is close enough to any executor that you don't have to get placement 
right: if you can't schedule work near the data, then running it on other nodes 
is better than not running work. oh, and failure modes are that of a single 
cluster: node and rack failures, not a single long-haul connection which could 
cut the entire cluster in half. If that happens, then all work running on the 
cluster without the driver is lost: you don't have the failure resliience you 
wanted, as it means that if the cluster with the driver actually failed, then 
the other cluster would not be able to take over.

Similarly, cluster filesystems tend to assume they are single DC, with its 
failure modes. life is more complex across two sites. I do know HDFS doesn't 
handle it, though there are things that do.

I would try to come up with a strategy for having separate applications running 
on the different DCs, with a story for data replication and reconciliation.

Even there, though, there'll inevitably be an SPOF. How do you find it? You 
Wait: it will find you.

-steve

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark GraphFrame ConnectedComponents

2017-01-06 Thread Steve Loughran

On 5 Jan 2017, at 21:10, Ankur Srivastava 
> wrote:

Yes I did try it out and it choses the local file system as my checkpoint 
location starts with s3n://

I am not sure how can I make it load the S3FileSystem.

set fs.default.name to s3n://whatever , or, in spark context, 
spark.hadoop.fs.default.name

However

1. you should really use s3a, if you have the hadoop 2.7 JARs on your classpath.
2. neither s3n or s3a are real filesystems, and certain assumptions that 
checkpointing code tends to make "renames being O(1) atomic calls" do not hold. 
It may be that checkpointing to s3 isn't as robust as you'd like




On Thu, Jan 5, 2017 at 12:12 PM, Felix Cheung 
> wrote:
Right, I'd agree, it seems to be only with delete.

Could you by chance run just the delete to see if it fails

FileSystem.get(sc.hadoopConfiguration)
.delete(new Path(somepath), true)

From: Ankur Srivastava 
>
Sent: Thursday, January 5, 2017 10:05:03 AM
To: Felix Cheung
Cc: user@spark.apache.org

Subject: Re: Spark GraphFrame ConnectedComponents

Yes it works to read the vertices and edges data from S3 location and is also 
able to write the checkpoint files to S3. It only fails when deleting the data 
and that is because it tries to use the default file system. I tried looking up 
how to update the default file system but could not find anything in that 
regard.

Thanks
Ankur

On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung 
> wrote:
>From the stack it looks to be an error from the explicit call to 
>hadoop.fs.FileSystem.

Is the URL scheme for s3n registered?
Does it work when you try to read from s3 from Spark?

_
From: Ankur Srivastava 
>
Sent: Wednesday, January 4, 2017 9:23 PM
Subject: Re: Spark GraphFrame ConnectedComponents
To: Felix Cheung >
Cc: >



This is the exact trace from the driver logs

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: 
s3n:///8ac233e4-10f9-4eb3-aa53-df6d9d7ea7be/connected-components-c1dbc2b0/3,
 expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:529)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:534)
at 
org.graphframes.lib.ConnectedComponents$.org$graphframes$lib$ConnectedComponents$$run(ConnectedComponents.scala:340)
at org.graphframes.lib.ConnectedComponents.run(ConnectedComponents.scala:139)
at GraphTest.main(GraphTest.java:31) --- Application Class
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10

Thanks
Ankur

On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava 
> wrote:
Hi

I am rerunning the pipeline to generate the exact trace, I have below part of 
trace from last run:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: 
s3n://, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:69)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:516)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:528)

Also I think the error is happening in this part of the code 
"ConnectedComponents.scala:339" I am referring the code 
@https://github.com/graphframes/graphframes/blob/master/src/main/scala/org/graphframes/lib/ConnectedComponents.scala

  if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
// TODO: remove this after 

Re: Spark Read from Google store and save in AWS s3

2017-01-06 Thread Steve Loughran

On 5 Jan 2017, at 20:07, Manohar Reddy 
> wrote:

Hi Steve,
Thanks for the reply and below is follow-up help needed from you.
Do you mean we can set up two native file system to single sparkcontext ,so 
then based on urls prefix( gs://bucket/path and dest s3a://bucket-on-s3/path2) 
will that identify and write/read appropriate cloud.

Is that my understanding right?


I wouldn't use the term "native FS", as they are all just client libraries to 
talk to the relevant object stores. You'd still have to have the cluster 
"default" FS.

but yes, you can use them: get your classpath right and they are all just URLS 
you use your code


Re: Spark Read from Google store and save in AWS s3

2017-01-05 Thread Steve Loughran

On 5 Jan 2017, at 09:58, Manohar753 
> wrote:

Hi All,

Using spark is  interoperability communication between two
clouds(Google,AWS) possible.
in my use case i need to take Google store as input to spark and do some
processing and finally needs to store in S3 and my spark engine runs on AWS
Cluster.

Please let me back is there any way for this kind of usecase bu using
directly spark without any middle components and share the info or link if
you have.

Thanks,


I've not played with GCS, and have some noted concerns about test coverage ( 
https://github.com/GoogleCloudPlatform/bigdata-interop/pull/40 ) , but assuming 
you are not hitting any specific problems, it should be a matter of having the 
input as gs://bucket/path and dest s3a://bucket-on-s3/path2

You'll need the google storage JARs on your classpath, along with those needed 
for S3n/s3a.

1. little talk on the topic, though I only play with azure and s3
https://www.youtube.com/watch?v=ND4L_zSDqF0

2. some notes; bear in mind that the s3a performance tuning covered relates to 
things surfacing in Hadoop 2.8, which you probably wont have.


https://hortonworks.github.io/hdp-aws/s3-spark/

A one line test for s3 installed is can you read the landsat CSV file

sparkContext.textFile("s3a://landsat-pds/scene_list.gz").count()

this should work from wherever you are if your classpath and credentials are 
set up


Re: How to load a big csv to dataframe in Spark 1.6

2017-01-03 Thread Steve Loughran

On 31 Dec 2016, at 16:09, Raymond Xie 
> wrote:

Hello Felix,

I followed the instruction and ran the command:

> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0

and I received the following error message:
java.lang.RuntimeException: java.net.ConnectException: Call From 
xie1/192.168.112.150 to localhost:9000 failed on 
connection exception: java.net.ConnectException: Connection refused; For more 
details see:  http://wiki.apache.org/hadoop/ConnectionRefused



Did you look at the wiki page? If not, why not?



Re: Question about Spark and filesystems

2017-01-03 Thread Steve Loughran

On 18 Dec 2016, at 19:50, joa...@verona.se wrote:

Since each Spark worker node needs to access the same files, we have
tried using Hdfs. This worked, but there were some oddities making me a
bit uneasy. For dependency hell reasons I compiled a modified Spark, and
this version exhibited the odd behaviour with Hdfs. The problem might
have nothing to do with Hdfs, but the situation made me curious about
the alternatives.

what were the oddities?


Re: Gradle dependency problem with spark

2016-12-16 Thread Steve Loughran
FWIW, although the underlying Hadoop declared guava dependency is pretty low, 
everything in org.apache.hadoop is set up to run against later versions. It 
just sticks with the old one to avoid breaking anything donwstream which does 
expect a low version number. See HADOOP-10101 for the ongoing pain there —and 
complain on there if you do find something in the Hadoop layer which can't 
handle later guava versions.




On 16 Dec 2016, at 11:07, Sean Owen 
> wrote:

Yes, that's the problem. Guava isn't generally mutually compatible across more 
than a couple major releases. You may have to hunt for a version that happens 
to have the functionality that both dependencies want, and hope that exists. 
Spark should shade Guava at this point but doesn't mean that you won't hit this 
problem from transitive dependencies.

On Fri, Dec 16, 2016 at 11:05 AM kant kodali 
> wrote:
I replaced guava-14.0.1.jar  with guava-19.0.jar in SPARK_HOME/jars and seem to 
work ok but I am not sure if it is the right thing to do. My fear is that if 
Spark uses features from Guava that are only present in 14.0.1 but not in 19.0 
I guess my app will break.



On Fri, Dec 16, 2016 at 2:22 AM, kant kodali 
> wrote:
Hi Guys,

Here is the simplified version of my problem. I have the following problem and 
I new to gradle



dependencies {
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.2'
compile group: 'com.github.brainlag', name: 'nsq-client', version: 
'1.0.0.RC2'
}

I took out the other dependencies for simplicity. The problem here is 
spark-core_2.11 uses com.google.guava:14.0.1 and nsq-client uses 
com.google.guava:19.0 so when I submit my fat uber jar using spark-submit I get 
the following error


Exception in thread "main" java.lang.NoSuchMethodError: 
com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
at com.github.brainlag.nsq.NSQProducer.(NSQProducer.java:22)
at com.hello.streamprocessing.app.SparkDriver2.main(SparkDriver2.java:37)

any help would be great. Also if you need more description you can find it 
here

Thanks!





Re: Handling Exception or Control in spark dataframe write()

2016-12-16 Thread Steve Loughran

> On 14 Dec 2016, at 18:10, bhayat  wrote:
> 
> Hello,
> 
> I am writing my RDD into parquet format but what i understand that write()
> method is still experimental and i do not know how i will deal with possible
> exceptions.
> 
> For example:
> 
> schemaXXX.write().mode(saveMode).parquet(parquetPathInHdfs);
> 
> In this example i do not know how i will handle exception if parquet path
> does not exist or host is not reachable.


the parent path will be created. You are more likely to see a problem if the 
final path does exist. 

> 
> Do you have any way to do it ?

generally, catch the IOExceptions raised and report them. The HDFS IPC layer 
has a fair amount of retry logic built in to handle transient outages of the 
namenode/datanodes (and long GC pauses, which look similar); when they give up 
you'll see an IOException of some kind or other. All other filesystem API calls 
tend to raise IOExceptions too. try/catch are your friends.

What is hard is: what do you do next? Retry? Give up? I don't think there's a 
clear consensus there

> 
> Thank you,
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Handling-Exception-or-Control-in-spark-dataframe-write-tp28210.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Few questions on reliability of accumulators value.

2016-12-15 Thread Steve Loughran

On 12 Dec 2016, at 19:57, Daniel Siegmann 
> wrote:

Accumulators are generally unreliable and should not be used. The answer to (2) 
and (4) is yes. The answer to (3) is both.

Here's a more in-depth explanation: 
http://imranrashid.com/posts/Spark-Accumulators/


That's a really nice article.

Accumulators work for generating statistics of things, such as bytes 
read/written (used internally for this), network errors ignored, etc. These 
things stay correct on retries: if you read more bytes, the byte counter should 
increase.

Where they are dangerous is they are treated as a real output of work, an 
answer to some query, albeit just a side effect. People have been doing that 
with MR counters since Hadoop 0.1x, so there's no need to feel bad about 
trying; everyone tries at one point. In Hadoop 1.x, trying to create too many 
counters would actually overload the entire job tracker; At some point a 
per-job limit went in for that reason; it's still in the MR code to keep costs 
down.

Spark's accumulators only use up your cluster's storage + extra data on the 
heartbeats, but because of retries it's less an accumulator of results, and 
more an accumulator of 'things that happened during one or more executions of a 
function against an RDD'

You should really be generating all output as the output of a series of 
functional operations on RDDs.


Re: WARN util.NativeCodeLoader

2016-12-12 Thread Steve Loughran

> On 8 Dec 2016, at 06:38, baipeng  wrote:
> 
> Hi ALL
> 
> I’m new to Spark.When I execute   spark-shell, the first line is as 
> follows
> WARN util.NativeCodeLoader: Unable to load native-hadoop library for your 
> platform... using builtin-java classes where applicable.
> Can someone tell me how to solve the problem?
> 


It's low level hadoop code warning that it couldn't load the hadoop native 
libraries that are needed for performance, and, on windows, basic 
functionality. 

In the servers, it's a sign that performance working with compressed files is 
going to be worse than it could be, HDFS encryption, and other things.

in your client: it's a distraction.

The fix: turn down logging there

log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR



> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 



Re: Access multiple cluster

2016-12-05 Thread Steve Loughran
if the remote filesystem is visible from the other, than a different HDFS 
value, e.g hdfs://analytics:8000/historical/  can be used for reads & writes, 
even if your defaultFS (the one where you get max performance) is, say 
hdfs://processing:8000/

-performance will be slower, in both directions
-if you have a fast pipe between the two clusters, then a job with many 
executors may unintentionally saturate the network, leading to unhappy people 
elsewhere.
-you'd better have mutual trust at the kerberos layer. There's a configuration 
option (I forget its name) to give spark-submit a list of hdfs namenodes it 
will need to get tokens from. Unless your spark cluster is being launched with 
keytabs, you will need to list upfront all hdfs clusters your job intends to 
work with

On 4 Dec 2016, at 21:45, ayan guha 
> wrote:


Hi

Is it possible to access hive tables sitting on multiple clusters in a single 
spark application?

We have a data processing cluster and analytics cluster. I want to join a table 
from analytics cluster with another table in processing cluster and finally 
write back in analytics cluster.

Best
Ayan



Re: What benefits do we really get out of colocation?

2016-12-03 Thread Steve Loughran

On 3 Dec 2016, at 09:16, Manish Malhotra 
> wrote:

thanks for sharing number as well !

Now a days even network can be with very high throughput, and might out perform 
the disk, but as Sean mentioned data on network will have other dependencies 
like network hops, like if its across rack, which can have switch in between.

But yes people are discussing and talking about Mesos + high performance 
network and not worried about the colocation for various use cases.

AWS emphmerial is not good for reliable storage file system, EBS is the 
expensive alternative :)


If you working with HDFS, then on linux HDFS can bypass the entire network 
stack: after opening a block for an authenticated user, HDFS passes the open 
file handle back to the caller for them to talk direct to the filesystem. You 
can't get any faster than that.

On AWS, well, your life is complex as networking is now something you get to 
pay for in your choice of VM and storage options; it is going to generally 
offer lower performance than a physical cluster.

Me? I'd recommend using HDFS for transient storage and then s3 for persistent 
storage of the final data


On Sat, Dec 3, 2016 at 1:12 AM, kant kodali 
> wrote:
Thanks Sean! Just for the record I am currently seeing 95 MB/s RX (Receive 
throughput ) on my spark worker machine when I do `sudo iftop -B`

The problem with instance store on AWS is that they all are ephemeral so 
placing Cassandra on top doesn't make a lot of sense. so In short, AWS doesn't 
seem to be the right place for colocating in theory. I would still give you the 
benefit of doubt and colocate :) but just the numbers are not reflecting 
significant margins in terms of performance gains for AWS


On Sat, Dec 3, 2016 at 12:56 AM, Sean Owen 
> wrote:
I'm sure he meant that this is downside to not colocating.
You are asking the right question. While networking is traditionally much 
slower than disk, that changes a bit in the cloud, where attached storage is 
remote too.
The disk throughput here is mostly achievable in normal workloads. However I 
think you'll find it's going to be much harder to get 1Gbps out of network 
transfers. That's just the speed of the local interface, and of course the 
transfer speed depends on hops across the network beyond that. Network latency 
is going to be higher than disk too, though that's not as much an issue in this 
context.

On Sat, Dec 3, 2016 at 8:42 AM kant kodali 
> wrote:
wait, how is that a benefit? isn't that a bad thing if you are saying 
colocating leads to more latency  and overall execution time is longer?

On Sat, Dec 3, 2016 at 12:34 AM, vincent gromakowski 
> wrote:

You get more latency on reads so overall execution time is longer

Le 3 déc. 2016 7:39 AM, "kant kodali" 
> a écrit :

I wonder what benefits do I really I get If I colocate my spark worker process 
and Cassandra server process on each node?

I understand the concept of moving compute towards the data instead of moving 
data towards computation but It sounds more like one is trying to optimize for 
network latency.

Majority of my nodes (m4.xlarge)  have 1Gbps = 125MB/s (Megabytes per second) 
Network throughput.

and the DISK throughput for m4.xlarge is 93.75 MB/s (link below)

http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSOptimized.html

so In this case I don't see how colocation can help even if there is one to one 
mapping from spark worker node to a colocated Cassandra node where say we are 
doing a table scan of billion rows ?

Thanks!







Re: Spark ignoring partition names without equals (=) separator

2016-11-29 Thread Steve Loughran

On 29 Nov 2016, at 05:19, Prasanna Santhanam 
<t...@apache.org<mailto:t...@apache.org>> wrote:

On Mon, Nov 28, 2016 at 4:39 PM, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

irrespective of naming, know that deep directory trees are performance killers 
when listing files on s3 and setting up jobs. You might actually be better off 
having them in the same directory and using a pattern like 2016-03-11-*
as the pattten to find files.

Thanks Bharat and Steve - I've generally followed the partitioned table format 
over the flat structure since this aides WHERE clause filtering 
(PredicatePushDown?). Wrt performance that helps the write once, query many 
times kind of workloads. Changing this in our production application that dumps 
these is cumbersome. Is there a configuration that would override this 
restriction for Spark? Does it make sense to have one?

if it's done' leave alone. Just be aware that s3 doesn't like deep directories 
that much, as listing is fairly slow



Re: Spark ignoring partition names without equals (=) separator

2016-11-28 Thread Steve Loughran

irrespective of naming, know that deep directory trees are performance killers 
when listing files on s3 and setting up jobs. You might actually be better off 
having them in the same directory and using a pattern like 2016-03-11-*
as the pattten to find files.



On 28 Nov 2016, at 04:18, Prasanna Santhanam 
> wrote:

I've been toying around with Spark SQL lately and trying to move some workloads 
from Hive. In the hive world the partitions below are recovered on an ALTER 
TABLE RECOVER PARTITIONS

Path:
s3://bucket-company/path/2016/03/11
s3://bucket-company/path/2016/03/12
s3://bucket-company/path/2016/03/13

Where as Spark ignores these unless the partition information is of the format 
below

s3://bucket-company/path/year=2016/month=03/day=11
s3://bucket-company/path/year=2016/month=03/day=12
s3://bucket-company/path/year=2016/month=03/day=13

The code for this is in 
ddl.scala.
If my DDL already expresses the partition information why does Spark ignore the 
partition and enforce this separator?

DDL:
CREATE EXTERNAL TABLE test_tbl
(
   column1 STRING,
   column2 STRUCT  <... >
)
PARTITIONED BY (year STRING, month STRING, day STRING)
LOCATION s3://bucket-company/path

Thanks,







Re: Third party library

2016-11-27 Thread Steve Loughran

On 27 Nov 2016, at 02:55, kant kodali 
> wrote:

I would say instead of LD_LIBRARY_PATH you might want to use java.library.path

in the following way

java -Djava.library.path=/path/to/my/library or pass java.library.path along 
with spark-submit


This is only going to set up paths on the submitting system; to load JNI code 
in the executors, the binary needs to be sent to far end and then put on the 
Java load path there.

Copy the relevant binary to somewhere on the PATH of the destination machine. 
Do that and you shouldn't have to worry about other JVM options, (though it's 
been a few years since I did any JNI).

One trick: write a simple main() object/entry point which calls the JNI method, 
and doesn't attempt to use any spark libraries; have it log any exception and 
return an error code if the call failed. This will let you use it as a link 
test after deployment: if you can't run that class then things are broken, 
before you go near spark


On Sat, Nov 26, 2016 at 6:44 PM, Gmail 
> wrote:
Maybe you've already checked these out. Some basic questions that come to my 
mind are:
1) is this library "foolib" or "foo-C-library" available on the worker node?
2) if yes, is it accessible by the user/program (rwx)?

Thanks,
Vasu.

On Nov 26, 2016, at 5:08 PM, kant kodali 
> wrote:

If it is working for standalone program I would think you can apply the same 
settings across all the spark worker  and client machines and give that a try. 
Lets start with that.

On Sat, Nov 26, 2016 at 11:59 AM, vineet chadha 
> wrote:
Just subscribed to  Spark User.  So, forwarding message again.

On Sat, Nov 26, 2016 at 11:50 AM, vineet chadha 
> wrote:
Thanks Kant. Can you give me a sample program which allows me to call jni from 
executor task ?   I have jni working in standalone program in scala/java.

Regards,
Vineet

On Sat, Nov 26, 2016 at 11:43 AM, kant kodali 
> wrote:
Yes this is a Java JNI question. Nothing to do with Spark really.

 java.lang.UnsatisfiedLinkError typically would mean the way you setup 
LD_LIBRARY_PATH is wrong unless you tell us that it is working for other cases 
but not this one.

On Sat, Nov 26, 2016 at 11:23 AM, Reynold Xin 
> wrote:
That's just standard JNI and has nothing to do with Spark, does it?


On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha 
> wrote:
Thanks Reynold for quick reply.

 I have tried following:

class MySimpleApp {
 // ---Native methods
  @native def fooMethod (foo: String): String
}

object MySimpleApp {
  val flag = false
  def loadResources() {
System.loadLibrary("foo-C-library")
  val flag = true
  }
  def main() {
sc.parallelize(1 to 10).mapPartitions ( iter => {
  if(flag == false){
  MySimpleApp.loadResources()
 val SimpleInstance = new MySimpleApp
  }
  SimpleInstance.fooMethod ("fooString")
  iter
})
  }
}

I don't see way to invoke fooMethod which is implemented in foo-C-library. Is I 
am missing something ? If possible, can you point me to existing implementation 
which i can refer to.

Thanks again.

~

On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin 
> wrote:
bcc dev@ and add user@


This is more a user@ list question rather than a dev@ list question. You can do 
something like this:

object MySimpleApp {
  def loadResources(): Unit = // define some idempotent way to load resources, 
e.g. with a flag or lazy val

  def main() = {
...

sc.parallelize(1 to 10).mapPartitions { iter =>
  MySimpleApp.loadResources()

  // do whatever you want with the iterator
}
  }
}





On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha 
> wrote:
Hi,

I am trying to invoke C library from the Spark Stack using JNI interface (here 
is sample  application code)


class SimpleApp {
 // ---Native methods
@native def foo (Top: String): String
}

object SimpleApp  {
   def main(args: Array[String]) {

val conf = new 
SparkConf().setAppName("SimpleApplication").set("SPARK_LIBRARY_PATH", "lib")
val sc = new SparkContext(conf)
 System.loadLibrary("foolib")
//instantiate the class
 val SimpleAppInstance = new SimpleApp
//String passing - Working
val ret = SimpleAppInstance.foo("fooString")
  }

Above code work fines.

I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath,  
spark.executor.extraLibraryPath at worker node

How can i invoke JNI library from worker node ? Where should i load it in 
executor ?
Calling  System.loadLibrary("foolib") inside the work node gives me following 
error :

Exception in thread "main" 

Re: covert local tsv file to orc file on distributed cloud storage(openstack).

2016-11-25 Thread Steve Loughran
have the first path to be something like 
.csv("file://home/user/dataset/data.csv")


If you working with files that big
 -don't use the inferSchema option, as that will trigger two scans through the 
data
 -try with a smaller file first, say 1MB or so

Trying to use spark *or any other tool* to upload a 150GB file to openstack is 
probably doomed. In fact, given how openstack handles large files, you are 
going to be in trouble.  Spark will try and upload the file i blocks (size in 
KB set in spark.hadoop.fs.swift..partsize  , but there's no retry logic in that 
code by the look of things, so the failure of any single PUT of a block will 
cause the run to fail. Spark will retry, but you could just hit the same 
problem again. Smaller file sizes: parallel processing, more resilience to 
failures, and easier to break things up later on.

Really, really try to break things up.

That said, given one of the problem is that the hadoop swift client doesn't do 
much retrying on failed writes, do the CSV- > ORC + snappy conversion locally 
in spark, then do the upload to swift using any tools you have to hand, be they 
command line or GUI. That should at least isolate the upload problem from the 
conversion



On 24 Nov 2016, at 18:44, vr spark 
<vrspark...@gmail.com<mailto:vrspark...@gmail.com>> wrote:

Hi, The source file i have is on local machine and its pretty huge like 150 gb. 
 How to go about it?

On Sun, Nov 20, 2016 at 8:52 AM, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

On 19 Nov 2016, at 17:21, vr spark 
<vrspark...@gmail.com<mailto:vrspark...@gmail.com>> wrote:

Hi,
I am looking for scala or python code samples to covert local tsv file to orc 
file and store on distributed cloud storage(openstack).

So, need these 3 samples. Please suggest.

1. read tsv
2. convert to orc
3. store on distributed cloud storage


thanks
VR

all options, 9 lines of code, assuming a spark context has already been setup 
with the permissions to write to AWS, and the relevant JARs for S3A to work on 
the CP. The read operation is inefficient as to determine the schema it scans 
the (here, remote) file twice; that may be OK for an example, but I wouldn't do 
that in production. The source is a real file belonging to amazon; dest a 
bucket of mine.

More details at: 
http://www.slideshare.net/steve_l/apache-spark-and-object-stores


val csvdata = spark.read.options(Map(
  "header" -> "true",
  "ignoreLeadingWhiteSpace" -> "true",
  "ignoreTrailingWhiteSpace" -> "true",
  "timestampFormat" -> "-MM-dd HH:mm:ss.SSSZZZ",
  "inferSchema" -> "true",
  "mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
csvdata.write.mode("overwrite").orc("s3a://hwdev-stevel-demo2/landsatOrc")




Re: How to write a custom file system?

2016-11-22 Thread Steve Loughran

On 21 Nov 2016, at 17:26, Samy Dindane 
> wrote:

Hi,

I'd like to extend the file:// file system and add some custom logic to the API 
that lists files.
I think I need to extend FileSystem or LocalFileSystem from 
org.apache.hadoop.fs, but I am not sure how to go about it exactly.


subclass it, then declare it as a

spark.hadoop.fs.samy.impl=SamyFSClass

to use urls like samy://home/files/data/*


You can also rebind file:// to point to your new fs, by overrriding fs.file.impl

There's a fairly formal definition of what a filesystem is meant to do

https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/filesystem.html

and lots of contract tests on each of the operations; you can find them al in 
the Hadoop-common/tests source tree...if you are thinking of doing anything 
non-trivial with filesystems, get these tests working before you start changing 
things. But know that as these tests don't generate load or concurrent 
requests, aren't sufficient to say that stuff works, only identify when it is 
broken at a basic level.


 GlusterFS comes from Redhat, they've got a connector which works with Hadoop & 
Spark code. Have you used it?


How to write a custom file system and make it usable by Spark?

Thank you,

Samy

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: covert local tsv file to orc file on distributed cloud storage(openstack).

2016-11-20 Thread Steve Loughran

On 19 Nov 2016, at 17:21, vr spark 
> wrote:

Hi,
I am looking for scala or python code samples to covert local tsv file to orc 
file and store on distributed cloud storage(openstack).

So, need these 3 samples. Please suggest.

1. read tsv
2. convert to orc
3. store on distributed cloud storage


thanks
VR

all options, 9 lines of code, assuming a spark context has already been setup 
with the permissions to write to AWS, and the relevant JARs for S3A to work on 
the CP. The read operation is inefficient as to determine the schema it scans 
the (here, remote) file twice; that may be OK for an example, but I wouldn't do 
that in production. The source is a real file belonging to amazon; dest a 
bucket of mine.

More details at: 
http://www.slideshare.net/steve_l/apache-spark-and-object-stores


val csvdata = spark.read.options(Map(
  "header" -> "true",
  "ignoreLeadingWhiteSpace" -> "true",
  "ignoreTrailingWhiteSpace" -> "true",
  "timestampFormat" -> "-MM-dd HH:mm:ss.SSSZZZ",
  "inferSchema" -> "true",
  "mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
csvdata.write.mode("overwrite").orc("s3a://hwdev-stevel-demo2/landsatOrc")


Re: Run spark with hadoop snapshot

2016-11-19 Thread Steve Loughran
I'd recommend you build a fill spark release with the new hadoop version; you 
should have built that locally earlier the same day (so that ivy/maven pick up 
the snapshot)


dev/make-distribution.sh -Pyarn,hadoop-2.7,hive -Dhadoop.version=2.9.0-SNAPSHOT;



> On 18 Nov 2016, at 19:31, lminer  wrote:
> 
> I'm trying to figure out how to run spark with a snapshot of Hadoop 2.8 that
> I built myself. I'm unclear on the configuration needed to get spark to work
> with the snapshot.
> 
> I'm running spark on mesos. Per the spark documentation, I run spark-submit
> as follows using the `spark-2.0.2-bin-without-hadoop`, but spark doesn't
> appear to be finding hadoop 2.8.
> 
>export SPARK_DIST_CLASSPATH=$(/path/to/hadoop2.8/bin/hadoop classpath)
>spark-submit --verbose --master mesos://$MASTER_HOST/mesos
> 
> I get the error:
> 
>Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/hadoop/fs/FSDataInputStream
>at
> org.apache.spark.deploy.SparkSubmitArguments.handle(SparkSubmitArguments.scala:403)
>at
> org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:163)
>at
> org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:98)
>at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
>at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.fs.FSDataInputStream
>at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>at java.security.AccessController.doPrivileged(Native Method)
>at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>... 5 more
> 
> Any ideas on the proper configuration?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Run-spark-with-hadoop-snapshot-tp28105.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Long-running job OOMs driver process

2016-11-18 Thread Steve Loughran

On 18 Nov 2016, at 14:31, Keith Bourgoin 
> wrote:

We thread the file processing to amortize the cost of things like getting files 
from S3.

Define cost here: actual $ amount, or merely time to read the data?

If it's read times, you should really be trying the new stuff coming in the 
hadoop-2.8+ s3a client, which has put a lot of work into higher performance 
reading of ORC & Parquet data, plus general improvements in listing/opening, 
etc, trying to cut down on slow metadata queries. You are still going to have 
delays of tens to hundreds of millis on every HTTP request (bigger ones for DNS 
problems and/or s3 load balancer overload), but once open, seek + read of s3 
data will be much faster (not end-to-end read of an s3 file though, that's just 
bandwidth limitation after the HTTPS negotiation).

http://www.slideshare.net/steve_l/hadoop-hive-spark-and-object-stores

Also, do make sure you are using s3a URLs, if you weren't already

-Steve


Re: Any with S3 experience with Spark? Having ListBucket issues

2016-11-18 Thread Steve Loughran

On 16 Nov 2016, at 22:34, Edden Burrow 
> wrote:

Anyone dealing with a lot of files with spark?  We're trying s3a with 2.0.1 
because we're seeing intermittent errors in S3 where jobs fail and saveAsText 
file fails. Using pyspark.

How many files? Thousands? Millions?

If you do have some big/complex file structure, I'd really like to know; it not 
only helps us make sure that spark/hive metastore/s3a can handle the layout, it 
may help improve some advice on what not to do.


Is there any issue with working in a S3 folder that has too many files?  How 
about having versioning enabled? Are these things going to be a problem?

Many, many files shouldn't be a problem, except for slowing down some 
operations, and creating larger memory structures to get passed round. 
Partitioning can get slow.


We're pre-building the s3 file list and storing it in a file and passing that 
to textFile as a long comma separated list of files - So we are not running 
list files.

But we get errors with saveAsText file, related to ListBucket.  Even though 
we're not using wildcard '*'.

org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: 
Failed to parse XML document with handler class 
org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler


at a guess, it'll be some checks before the write that the parent directory 
exists and the destination path isn't a directory.


Running spark 2.0.1 with the s3a protocol.

Not with a stack trace containing org.jets3t you aren't. That's what you'd 
expect for s3 and s3n; the key feature of s3a is moving onto the amazon SDK, 
where stack traces move to com.amazon classes

Make sure you *are* using s3a, ideally on Hadoop 2.7.x  (or even better, HDP 
2.5 where you get all the Hadoop 2.8 read pipeline optimisations) On Hadoop 
2.6.x there were still some stabilisation issues that only surfaced in the wild.

Some related slides 
http://www.slideshare.net/steve_l/apache-spark-and-object-stores

-Steve


Re: Delegation Token renewal in yarn-cluster

2016-11-04 Thread Steve Loughran

On 4 Nov 2016, at 01:37, Marcelo Vanzin 
> wrote:

On Thu, Nov 3, 2016 at 3:47 PM, Zsolt Tóth 
> wrote:
What is the purpose of the delegation token renewal (the one that is done
automatically by Hadoop libraries, after 1 day by default)? It seems that it
always happens (every day) until the token expires, no matter what. I'd
probably find an answer to that in a basic Hadoop security description.



* DTs allow a long lived job to outlast the Kerberos ticket lifetime of the 
submitter; usually 48-72h.
* submitting jobs with DTs limit the access of the job to those services for 
which you have a DT; no need to acquire Kerberos tickets for every query being 
run. This keeps load on kerberos down, which is good as with Active Directory 
that's usually shared with the rest of the organisation. Some kerberos servers 
treat a bulk access from a few thousand machines as a brute force attack.
* Delegation tokens can also be revoked at the NN. After a process terminates, 
something (YARN NM?) can chat with the NN and say "no longer valid". In 
contrast, Kerberos TGTs stay valid until that timeout, without any revocation 
mechanism.

I'm not sure and I never really got a good answer to that (I had the
same question in the past). My best guess is to limit how long an
attacker can do bad things if he gets hold of a delegation token. But
IMO if an attacker gets a delegation token, that's pretty bad
regardless of how long he can use it...


correct: limits the damage. In contrast, if someone has your keytab, they have 
access until that KT expires.




I have a feeling that giving the keytab to Spark bypasses the concept behind
delegation tokens. As I understand, the NN basically says that "your
application can access hdfs with this delegation token, but only for 7
days".

I'm not sure why there's a 7 day limit either, but let's assume
there's a good reason. Basically the app, at that point, needs to
prove to the NN it has a valid kerberos credential. Whether that's
from someone typing their password into a terminal, or code using a
keytab, it doesn't really matter. If someone was worried about that
user being malicious they'd disable the user's login in the KDC.

This feature is needed because there are apps that need to keep
running, unattended, for longer than HDFS's max lifetime setting.


pretty much it. FWIW that's why turning Kerberos on midweek morning, rather 
than a friday evening, is wise. The 7 day timeout event will start happening 
during working hours.

https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md



Re: sanboxing spark executors

2016-11-04 Thread Steve Loughran

> On 4 Nov 2016, at 06:41, blazespinnaker  wrote:
> 
> Is there a good method / discussion / documentation on how to sandbox a spark
> executor?   Assume the code is untrusted and you don't want it to be able to
> make un validated network connections or do unvalidated alluxio/hdfs/file


use Kerberos to auth HDFS connections, HBase, Hive. When enabled spark 
processes (all yarn processes) will run as different users in the cluster for 
isolation there too.

no easy way to monitor/block general outbound network connections though.  

> io.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/sanboxing-spark-executors-tp28014.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0 with Hadoop 3.0?

2016-10-29 Thread Steve Loughran

On 27 Oct 2016, at 23:04, adam kramer 
> wrote:

Is the version of Spark built for Hadoop 2.7 and later only for 2.x releases?

Is there any reason why Hadoop 3.0 is a non-starter for use with Spark
2.0? The version of aws-sdk in 3.0 actually works for DynamoDB which
would resolve our driver dependency issues.

what version problems are you having there?


There's a patch to move to AWS SDK 10.10, but that has a jackson 2.6.6+ 
dependency; that being something I'd like to do in Hadoop branch-2 as well, as 
it is Time to Move On ( HADOOP-12705 ) . FWIW all jackson 1.9 dependencies have 
been ripped out, leaving on that 2.x version problem.

https://issues.apache.org/jira/browse/HADOOP-13050

The HADOOP-13345 s3guard work will pull in a (provided) dependency on dynamodb; 
looks like the HADOOP-13449 patch moves to SDK 1.11.0.

I think we are likely to backport that to branch-2 as well, though it'd help 
the dev & test there if you built and tested your code against trunk early —not 
least to find any changes in that transitive dependency set.


Thanks,
Adam

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: Spark security

2016-10-27 Thread Steve Loughran

On 13 Oct 2016, at 14:40, Mendelson, Assaf 
> wrote:

Hi,
We have a spark cluster and we wanted to add some security for it. I was 
looking at the documentation (in  
http://spark.apache.org/docs/latest/security.html) and had some questions.
1.   Do all executors listen by the same blockManager port? For example, in 
yarn there are multiple executors per node, do they all listen to the same port?

On YARN the executors will come up on their own ports.

2.   Are ports defined in earlier version (e.g. 
http://spark.apache.org/docs/1.6.1/security.html) and removed in the latest 
(such as spark.executor.port and spark.fileserver.port) gone and can be blocked?
3.   If I define multiple workers per node in spark standalone mode, how do 
I set the different ports for each worker (there is only one 
spark.worker.ui.port / SPARK_WORKER_WEBUI_PORT definition. Do I have to start 
each worker separately to configure a port?) The same is true for the worker 
port (SPARK_WORKER_PORT)
4.   Is it possible to encrypt the logs instead of just limiting with 
permissions the log directory?

if writing to HDFS on a Hadoop 2.7+ cluster you can use HDFS Encryption At Rest 
to encrypt the data on the disks. If you are talking to S3 with the Hadoop 2.8+ 
libraries (not officially shipping), you can use S3 server side encryption with 
AWS managed keys too.

5.   Is the communication between the servers encrypted (e.g. using ssh?)

you can enable this;

https://spark.apache.org/docs/latest/security.html
https://spark.apache.org/docs/latest/configuration.html#security

spark.network.sasl.serverAlwaysEncrypt true
spark.authenticate.enableSaslEncryption true

I *believe* that encrypted shuffle comes with 2.1   
https://issues.apache.org/jira/browse/SPARK-5682

as usual, look in the source to really understand

there's various ways to interact with spark and within; you need to make sure 
they are all secured against malicious users

-web UI. on YARN, you can use SPNEGO to kerberos-auth the yarn RM proxy; the 
Spark UI will 302 all direct requests to its web UI back to that proxy. 
Communications behind the scnese between the RM and the Spark  UI will not, 
AFAIK, be encrypted/authed.

-spark-driver executor comms
-bulk data exchange between drivers
-shuffle service in executor, or hosted inside YARN node managers.
-spark-filesystem communications
-spark to other data source communications (Kafka, etc)

You're going to have go through them all and do the checklist.

As is usual in an open source project, documentation improvements are always 
welcome. There is a good security doc in the spark source —but I'm sure extra 
contributions will be welcome




6.   Are there any additional best practices beyond what is written in the 
documentation?
Thanks,

In a YARN cluster, Kerberos is mandatory if you want any form of security. 
Sorry.



Re: spark infers date to be timestamp type

2016-10-27 Thread Steve Loughran
CSV type inference isn't really ideal: it does a full scan of a file to 
determine this; you are doubling the amount of data you need to read. Unless 
you are just exploring files in your notebook, I'd recommend doing it once, 
getting the schema from it then using that as the basis for the code snippet 
where you really define the schema. That's when you can explicitly declare the 
schema types if the inferred ones aren't great.

(maybe I should write something which prints out the scala/py code for that 
declaration rather than having to do it by hand...)

On 27 Oct 2016, at 05:55, Hyukjin Kwon 
> wrote:

Hi Koert,


Sorry, I thought you meant this is a regression between 2.0.0 and 2.0.1. I just 
checked It has not been supporting to infer DateType before[1].

Yes, it only supports to infer such data as timestamps currently.


[1]https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L85-L92




2016-10-27 9:12 GMT+09:00 Anand Viswanathan 
>:
Hi,

you can use the customSchema(for DateType) and specify dateFormat in .option().
or
at spark dataframe side, you can convert the timestamp to date using cast to 
the column.

Thanks and regards,
Anand Viswanathan

On Oct 26, 2016, at 8:07 PM, Koert Kuipers 
> wrote:

hey,
i create a file called test.csv with contents:
date
2015-01-01
2016-03-05

next i run this code in spark 2.0.1:
spark.read
  .format("csv")
  .option("header", true)
  .option("inferSchema", true)
  .load("test.csv")
  .printSchema

the result is:
root
 |-- date: timestamp (nullable = true)


On Wed, Oct 26, 2016 at 7:35 PM, Hyukjin Kwon 
> wrote:

There are now timestampFormat for TimestampType and dateFormat for DateType.

Do you mind if I ask to share your codes?

On 27 Oct 2016 2:16 a.m., "Koert Kuipers" 
> wrote:
is there a reason a column with dates in format -mm-dd in a csv file is 
inferred to be TimestampType and not DateType?

thanks! koert






Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-24 Thread Steve Loughran

On 24 Oct 2016, at 20:32, Cheng Lian 
<lian.cs@gmail.com<mailto:lian.cs@gmail.com>> wrote:



On 10/22/16 6:18 AM, Steve Loughran wrote:

...
On Sat, Oct 22, 2016 at 3:41 AM, Cheng Lian 
<lian.cs@gmail.com<mailto:lian.cs@gmail.com>> wrote:

What version of Spark are you using and how many output files does the job 
writes out?

By default, Spark versions before 1.6 (not including) writes Parquet summary 
files when committing the job. This process reads footers from all Parquet 
files in the destination directory and merges them together. This can be 
particularly bad if you are appending a small amount of data to a large 
existing Parquet dataset.

If that's the case, you may disable Parquet summary files by setting Hadoop 
configuration " parquet.enable.summary-metadata" to false.


Now I'm a bit mixed up. Should that be 
spark.sql.parquet.enable.summary-metadata =false?
No, "parquet.enable.summary-metadata" is a Hadoop configuration option 
introduced by Parquet. In Spark 2.0, you can simply set it using 
spark.conf.set(), Spark will propagate it properly.


OK, chased it down to  a feature that ryanb @ netflix made optional, presumably 
for their s3 work (PARQUET-107 )

This is what I'm going to say make a good set of options for S3A & Parquet

spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.hadoop.parquet.enable.summary-metadata false

While for ORC, you want


spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 1
spark.sql.orc.filterPushdown true

And:

spark.sql.hive.metastorePartitionPruning true

along with commitment via:

spark.speculation false
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped true



For when people get to play with the Hadoop S3A phase II binaries, they'll also 
be wanting

spark.hadoop.fs.s3a.readahead.range 157810688

// faster backward seek for ORC and Parquet input
spark.hadoop.fs.s3a.experimental.input.fadvise random

// PUT blocks in separate threads
spark.hadoop.fs.s3a.fast.output.enabled true


the fadvise one is *really* good when working with ORC/Parquet; without that 
column filtering and predicate pushdown is somewhat crippled.



Re: Getting the IP address of Spark Driver in yarn-cluster mode

2016-10-24 Thread Steve Loughran

On 24 Oct 2016, at 19:34, Masood Krohy 
> wrote:

Hi everyone,

Is there a way to set the IP address/hostname that the Spark Driver is going to 
be running on when launching a program through spark-submit in yarn-cluster 
mode (PySpark 1.6.0)?

I do not see an option for this. If not, is there a way to get this IP address 
after the Spark app has started running? (through an API call at the beginning 
of the program to be used in the rest of the program). spark-submit outputs 
“ApplicationMaster host: 10.0.0.9” in the console (and changes on every run due 
to yarn cluster mode) and I am wondering if this can be accessed within the 
program. It does not seem to me that a YARN node label can be used to tie the 
Spark Driver/AM to a node, while allowing the Executors to run on all the nodes.



you can grab it off the YARN API itself; there's a REST view as well as a 
fussier RPC level. That is, assuming you want the web view, which is what is 
registered.

If you know the application ID, you can also construct a URL through the YARN 
proxy; any attempt to talk direct to the AM is going to get 302'd back there 
anyway so any kerberos credentials can be verified.



Re: Issues with reading gz files with Spark Streaming

2016-10-24 Thread Steve Loughran

On 22 Oct 2016, at 20:58, Nkechi Achara 
<nkach...@googlemail.com<mailto:nkach...@googlemail.com>> wrote:

I do not use rename, and the files are written to, and then moved to a 
directory on HDFS in gz format.

in that case there's nothing obvious to mee.

try logging at trace/debug the class:
org.apache.spark.sql.execution.streaming.FileStreamSource


On 22 October 2016 at 15:14, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

> On 21 Oct 2016, at 15:53, Nkechi Achara 
> <nkach...@googlemail.com<mailto:nkach...@googlemail.com>> wrote:
>
> Hi,
>
> I am using Spark 1.5.0 to read gz files with textFileStream, but when new 
> files are dropped in the specified directory. I know this is only the case 
> with gz files as when i extract the file into the directory specified the 
> files are read on the next window and processed.
>
> My code is here:
>
> val comments = ssc.fileStream[LongWritable, Text, 
> TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
>   map(pair => pair._2.toString)
> comments.foreachRDD(i => i.foreach(m=> println(m)))
>
> any idea why the gz files are not being recognized.
>
> Thanks in advance,
>
> K

Are the files being written in the directory or renamed in? As you should be 
using rename() against a filesystem (not an object store) to make sure that the 
file isn't picked up




Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-22 Thread Steve Loughran

On 22 Oct 2016, at 00:48, Chetan Khatri 
> wrote:

Hello Cheng,

Thank you for response.

I am using spark 1.6.1, i am writing around 350 gz parquet part files for 
single table. Processed around 180 GB of Data using Spark.

Are you writing to GCS storage to to the local HDD?

Regarding options to set, for performance reads against object store hosted 
parquet data, also go for

spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false






On Sat, Oct 22, 2016 at 3:41 AM, Cheng Lian 
> wrote:

What version of Spark are you using and how many output files does the job 
writes out?

By default, Spark versions before 1.6 (not including) writes Parquet summary 
files when committing the job. This process reads footers from all Parquet 
files in the destination directory and merges them together. This can be 
particularly bad if you are appending a small amount of data to a large 
existing Parquet dataset.

If that's the case, you may disable Parquet summary files by setting Hadoop 
configuration " parquet.enable.summary-metadata" to false.


Now I'm a bit mixed up. Should that be 
spark.sql.parquet.enable.summary-metadata =false?


We've disabled it by default since 1.6.0

Cheng

On 10/21/16 1:47 PM, Chetan Khatri wrote:
Hello Spark Users,

I am writing around 10 GB of Processed Data to Parquet where having 1 TB of HDD 
and 102 GB of RAM, 16 vCore machine on Google Cloud.

Every time, i write to parquet. it shows on Spark UI that stages succeeded but 
on spark shell it hold context on wait mode for almost 10 mins. then it clears 
broadcast, accumulator shared variables.

Can we sped up this thing ?

Thanks.

--
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee. The information may also be legally 
privileged. This transmission is sent in trust, for the sole purpose of 
delivery to the intended recipient. If you have received this transmission in 
error, any use, reproduction or dissemination of this transmission is strictly 
prohibited. If you are not the intended recipient, please immediately notify 
the sender by reply e-mail or phone and delete this message and its 
attachments, if any.​​




--
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee. The information may also be legally 
privileged. This transmission is sent in trust, for the sole purpose of 
delivery to the intended recipient. If you have received this transmission in 
error, any use, reproduction or dissemination of this transmission is strictly 
prohibited. If you are not the intended recipient, please immediately notify 
the sender by reply e-mail or phone and delete this message and its 
attachments, if any.​​



Re: Issues with reading gz files with Spark Streaming

2016-10-22 Thread Steve Loughran

> On 21 Oct 2016, at 15:53, Nkechi Achara  wrote:
> 
> Hi, 
> 
> I am using Spark 1.5.0 to read gz files with textFileStream, but when new 
> files are dropped in the specified directory. I know this is only the case 
> with gz files as when i extract the file into the directory specified the 
> files are read on the next window and processed.
> 
> My code is here:
> 
> val comments = ssc.fileStream[LongWritable, Text, 
> TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
>   map(pair => pair._2.toString)
> comments.foreachRDD(i => i.foreach(m=> println(m)))
> 
> any idea why the gz files are not being recognized.
> 
> Thanks in advance,
> 
> K

Are the files being written in the directory or renamed in? As you should be 
using rename() against a filesystem (not an object store) to make sure that the 
file isn't picked up

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why the json file used by sparkSession.read.json must be a valid json object per line

2016-10-20 Thread Steve Loughran

> On 19 Oct 2016, at 21:46, Jakob Odersky  wrote:
> 
> Another reason I could imagine is that files are often read from HDFS,
> which by default uses line terminators to separate records.
> 
> It is possible to implement your own hdfs delimiter finder, however
> for arbitrary json data, finding that delimiter would require stateful
> parsing of the file and would be difficult to parallelize across a
> cluster.
> 


good point. 

If you are creating your own files of a list of JSON files, then you could do 
your own encoding, one with say a header for each record (say 'J'+'S'+'O'+'N' + 
int64 length, and split on that: you don't need to scan a record to know its 
length, and you can scan a large document counting its records simply though a 
sequence of skip + read(byte[8]) operations.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark with kerberos

2016-10-19 Thread Steve Loughran

On 19 Oct 2016, at 00:18, Michael Segel 
> wrote:

(Sorry sent reply via wrong account.. )

Steve,

Kinda hijacking the thread, but I promise its still on topic to OP’s issue.. ;-)

Usually you will end up having a local Kerberos set up per cluster.
So your machine accounts (hive, yarn, hbase, etc …) are going to be local  to 
the cluster.


not necessarily...you can share a KDC. And in a land of active directory you'd 
need some trust



So you will have to set up some sort of realm trusts between the clusters.

If you’re going to be setting up security (Kerberos … ick! shivers… ;-) you’re 
going to want to keep the machine accounts isolated to the cluster.
And the OP said that he didn’t control the other cluster which makes me believe 
that they are separate.


good point; you may not be able to get the tickets for cluster C accounts. But 
if you can log in as a user for


I would also think that you would have trouble with the credential… isn’t is 
tied to a user at a specific machine?

there are two types of kerberos identity, simple "hdfs@REALM" and 
server-specific "hdfs/server@REALM". The simple ones work just as well in small 
clusters, it's just that in larger clusters your KDCs (especially AD) tend to 
interpret an attempt by 200 machines to log in as user "hdfs@REALM" in 30s as 
an attempt to brute force a password, and start rejecting logins. The 
separation into hdfs/_HOST_/REALM style avoids that, and may reduce the damage 
if the keytab leaks

If the user submitting work is logged into the KDC of cluster C, e.g:


kinit user@CLUSTERC


and spark is configured to ask for the extra namenode tokens,

spark.yarn.access.namenodes hdfs://cluster-c:8020


..then spark MAY ask for those tokens, pass them up to cluster B and so have 
them available for talking to cluster C. The submitted job is using the block 
tokens, so doesn't need to log in to kerberos itself, and if cluster B is 
insecure, doesn't need to worry about credentials and identity there. The HDFS 
client code just returns the block token to talk to cluster C when an attempt 
to talk to the DN of cluster C is rejected with an "authenticate yourself" 
response.

The main issue to me is: will that token get picked up and propagated to an 
insecure cluster, so as to support this operation? Because there's a risk that 
the ubiquitous static method, UserGroupInformation.isSecurityEnabled() is being 
checked in places, and as the cluster itself isn't secure 
(hadoop.security.authentication  in core-site.xml != "simple"). It looks like 
org.apache.spark.deploy.yarn.security.HDFSCredentialProvider is doing exactly 
that (as does HBase and Hive), meaning job submission doesn't fetch tokens 
unless the destination cluster is secure.

One thing that could be attempted, would be turning authentication on to 
kerberos just in the job launch config, and seeing if that will collect all 
required tokens *without* getting confused by the fact that YARN and HDFS don't 
need them.

spark.hadoop.hadoop.security.authentication

I have no idea if this works; you've have to try it and see

(Its been a while since I looked at this and I drank heavily to forget 
Kerberos… so I may be a bit fuzzy here.)


denying all knowledge of Kerberos is always a good tactic.


Re: About Error while reading large JSON file in Spark

2016-10-19 Thread Steve Loughran

On 18 Oct 2016, at 10:58, Chetan Khatri 
> wrote:

Dear Xi shen,

Thank you for getting back to question.

The approach i am following are as below:
I have MSSQL server as Enterprise data lack.

1. run Java jobs and generated JSON files, every file is almost 6 GB.
Correct spark need every JSON on separate line, so i did
sed -e 's/}/}\n/g' -s old-file.json > new-file.json
to get every json element on separate lines.
2. uploaded to s3 bucket and reading from their using sqlContext.read.json() 
function, where i am getting above error.

Note: If i am running for small size files then i am not getting this error 
where JSON elements are almost same structured.

Current approach:


  *splitting large JSON(6 GB) to 1-1 GB then will process.

Note: Machine size is , 1 master and 2 slave, each 4 vcore, 26 GB RAM

I see what you are trying to do here: one JSON file per line, then splitting by 
line so that you can parallelise JSON processing, as well as holding many JSON 
objects in a single s3 file. This is a devious little trick. It just doesn't 
work once the json files goes > 2^31 bytes long, as the code to split by line 
breaks.

You could write your own input splitter which actually does basic Json parsing, 
splitting up by looking for the final } in a JSON clause (harder than you 
think, as you need to remember how many {} clauses you have entered and not 
include escaped "{" in strings.

a quick google shows some that may be a good starting point

https://github.com/Pivotal-Field-Engineering/pmr-common/blob/master/PivotalMRCommon/src/main/java/com/gopivotal/mapreduce/lib/input/JsonInputFormat.java
https://github.com/alexholmes/json-mapreduce



Re: About Error while reading large JSON file in Spark

2016-10-18 Thread Steve Loughran

On 18 Oct 2016, at 08:43, Chetan Khatri 
> wrote:

Hello Community members,

I am getting error while reading large JSON file in spark,


the underlying read code can't handle more than 2^31 bytes in a single line:

if (bytesConsumed > Integer.MAX_VALUE) {
  throw new IOException("Too many bytes before newline: " + bytesConsumed);
}

That's because it's trying to split work by line, and of course, there aren't 
lines

you need to move over to reading the JSON by other means, i'm afraid. At a 
guess, something involving SparkContext.binaryFiles() streaming the data 
straight into a JSON parser,



Code:

val landingVisitor = 
sqlContext.read.json("s3n://hist-ngdp/lvisitor/lvisitor-01-aug.json")

unrelated, but use s3a if you can. It's better, you know.


Error:

16/10/18 07:30:30 ERROR Executor: Exception in task 8.0 in stage 0.0 (TID 8)
java.io.IOException: Too many bytes before newline: 2147483648
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:249)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:135)
at 
org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:237)

What would be resolution for the same ?

Thanks in Advance !


--
Yours Aye,
Chetan Khatri.




Re: spark with kerberos

2016-10-18 Thread Steve Loughran

On 17 Oct 2016, at 22:11, Michael Segel 
<michael_se...@hotmail.com<mailto:michael_se...@hotmail.com>> wrote:

@Steve you are going to have to explain what you mean by ‘turn Kerberos on’.

Taken one way… it could mean making cluster B secure and running Kerberos and 
then you’d have to create some sort of trust between B and C,



I'd imagined making cluster B a kerberized cluster.

I don't think you need to go near trust relations though —ideally you'd just 
want the same accounts everywhere if you can, if not, the main thing is that 
the user submitting the job can get a credential for  that far NN at job 
submission time, and that credential is propagated all the way to the executors.


Did you mean turn on kerberos on the nodes in Cluster B so that each node 
becomes a trusted client that can connect to C

OR

Did you mean to turn on kerberos on the master node (eg edge node) where the 
data persists if you collect() it so its off the cluster on to a single machine 
and then push it from there so that only that machine has to have kerberos 
running and is a trusted server to Cluster C?


Note: In option 3, I hope I said it correctly, but I believe that you would be 
collecting the data to a client (edge node) before pushing it out to the 
secured cluster.





Does that make sense?

On Oct 14, 2016, at 1:32 PM, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:


On 13 Oct 2016, at 10:50, dbolshak 
<bolshakov.de...@gmail.com<mailto:bolshakov.de...@gmail.com>> wrote:

Hello community,

We've a challenge and no ideas how to solve it.

The problem,

Say we have the following environment:
1. `cluster A`, the cluster does not use kerberos and we use it as a source
of data, important thing is - we don't manage this cluster.
2. `cluster B`, small cluster where our spark application is running and
performing some logic. (we manage this cluster and it does not have
kerberos).
3. `cluster C`, the cluster uses kerberos and we use it to keep results of
our spark application, we manage this cluster

Our requrements and conditions that are not mentioned yet:
1. All clusters are in a single data center, but in the different
subnetworks.
2. We cannot turn on kerberos on `cluster A`
3. We cannot turn off kerberos on `cluster C`
4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
5. Spark app is built on top of RDD and does not depend on spark-sql.

Does anybody know how to write data using RDD api to remote cluster which is
running with Kerberos?

If you want to talk to the secure clsuter, C, from code running in cluster B, 
you'll need to turn kerberos on there. Maybe, maybe, you could just get away 
with kerberos being turned off, but you, the user, launching the application 
while logged in to kerberos yourself and so trusted by Cluster C.

one of the problems you are likely to hit with Spark here is that it's only 
going to collect the tokens you need to talk to HDFS at the time you launch the 
application, and by default, it only knows about the cluster FS. You will need 
to tell spark about the other filesystem at launch time, so it will know to 
authenticate with it as you, then collect the tokens needed for the application 
itself to work with kerberos.

spark.yarn.access.namenodes=hdfs://cluster-c:8080

-Steve

ps: https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/




Re: spark with kerberos

2016-10-14 Thread Steve Loughran

On 13 Oct 2016, at 10:50, dbolshak 
> wrote:

Hello community,

We've a challenge and no ideas how to solve it.

The problem,

Say we have the following environment:
1. `cluster A`, the cluster does not use kerberos and we use it as a source
of data, important thing is - we don't manage this cluster.
2. `cluster B`, small cluster where our spark application is running and
performing some logic. (we manage this cluster and it does not have
kerberos).
3. `cluster C`, the cluster uses kerberos and we use it to keep results of
our spark application, we manage this cluster

Our requrements and conditions that are not mentioned yet:
1. All clusters are in a single data center, but in the different
subnetworks.
2. We cannot turn on kerberos on `cluster A`
3. We cannot turn off kerberos on `cluster C`
4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
5. Spark app is built on top of RDD and does not depend on spark-sql.

Does anybody know how to write data using RDD api to remote cluster which is
running with Kerberos?

If you want to talk to the secure clsuter, C, from code running in cluster B, 
you'll need to turn kerberos on there. Maybe, maybe, you could just get away 
with kerberos being turned off, but you, the user, launching the application 
while logged in to kerberos yourself and so trusted by Cluster C.

one of the problems you are likely to hit with Spark here is that it's only 
going to collect the tokens you need to talk to HDFS at the time you launch the 
application, and by default, it only knows about the cluster FS. You will need 
to tell spark about the other filesystem at launch time, so it will know to 
authenticate with it as you, then collect the tokens needed for the application 
itself to work with kerberos.

spark.yarn.access.namenodes=hdfs://cluster-c:8080

-Steve

ps: https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/


  1   2   3   >