[jira] [Commented] (SPARK-20703) Add an operator for writing data out

2017-07-13 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085559#comment-16085559
 ] 

Steve Loughran commented on SPARK-20703:


Regarding a patch for this, what do people suggest as a good policy for failures

FNFE: log@debug, return 0
IOE: log@info, full stack @debug, return 0

or treat FNFE the same as any other IOE (which is most likely to be a transient 
network failure, given by this point we know that the user has connectivity, 
auth and write access to the bucket)?

# I would like the option of printing that full stack, so that if the time 
comes to debug it, saying "edit this log setting" is enough to make problems 
visible in production.
# Yet I don't want to log the full stack as a default as that can lead to 
panic, and it also tends to mean when there are real problems in the code 
people may just paste that stack in, rather than one for the real problem.

> Add an operator for writing data out
> 
>
> Key: SPARK-20703
> URL: https://issues.apache.org/jira/browse/SPARK-20703
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Liang-Chi Hsieh
> Fix For: 2.3.0
>
>
> We should add an operator for writing data out. Right now in the explain plan 
> / UI there is no way to tell whether a query is writing data out, and also 
> there is no way to associate metrics with data writes. It'd be tremendously 
> valuable to do this for adding metrics and for visibility.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21374) Reading globbed paths from S3 into DF doesn't work if filesystem caching is disabled

2017-07-13 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085645#comment-16085645
 ] 

Steve Loughran commented on SPARK-21374:


This is possibly a sign that your new configuration isn't having its auth 
values picked up, or they are incorrect (i.e properties are wrong). Its working 
for enabled caching as some other codepath has set them up with the right 
properties, and so when used in the DF, the previous params are picked up.

# if using Hadoop 2.7.x JARs, switch to s3a and use s3a in the URLs & settings. 
You don't need to set the fs.s3a.impl field either; done for yoiu.
# if you can upgrade to Hadoop 2.8 binaries, you can use per-bucket 
configuration; this does exactly what you want: lets you configure different 
auth details for different buckets, without having to play these games. See 
[https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Configuring_different_S3_buckets]
 and 
[https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_cloud-data-access/content/s3-per-bucket-configs.html]


going to 2.8 binaries (or anything with the feature backported to a 2.7.x 
variant) should solve your problem without you having to worry about what you 
are seeing here.

> Reading globbed paths from S3 into DF doesn't work if filesystem caching is 
> disabled
> 
>
> Key: SPARK-21374
> URL: https://issues.apache.org/jira/browse/SPARK-21374
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.1
>Reporter: Andrey Taptunov
>
> *Motivation:*
> In my case I want to disable filesystem cache to be able to change S3's 
> access key and secret key on the fly to read from buckets with different 
> permissions. This works perfectly fine for RDDs but doesn't work for DFs.
> *Example (works for RDD but fails for DataFrame):*
> {code:java}
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
> object SimpleApp {
>   def main(args: Array[String]) {
> val awsAccessKeyId = "something"
> val awsSecretKey = "something else"
> val conf = new SparkConf().setAppName("Simple 
> Application").setMaster("local[*]")
> val sc = new SparkContext(conf)
> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", awsAccessKeyId)
> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", awsSecretKey)
> sc.hadoopConfiguration.setBoolean("fs.s3.impl.disable.cache",true)
> 
> sc.hadoopConfiguration.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> sc.hadoopConfiguration.set("fs.s3.buffer.dir","/tmp")
> val spark = SparkSession.builder().config(conf).getOrCreate()
> val rddFile = sc.textFile("s3://bucket/file.csv").count // ok
> val rddGlob = sc.textFile("s3://bucket/*").count // ok
> val dfFile = spark.read.format("csv").load("s3://bucket/file.csv").count 
> // ok
> 
> val dfGlob = spark.read.format("csv").load("s3://bucket/*").count 
> // IllegalArgumentExcepton. AWS Access Key ID and Secret Access Key must 
> be specified as the username or password (respectively)
> // of a s3 URL, or by setting the fs.s3.awsAccessKeyId or 
> fs.s3.awsSecretAccessKey properties (respectively).
>
> sc.stop()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure

2017-07-15 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088630#comment-16088630
 ] 

Steve Loughran commented on SPARK-19790:


I've now summarised the FileOutputCommitter v1 and v2 algorithms as far as I 
can understand from the source and some step throughts; I think I'd need to add 
a few more tests to be really sure I understand it: 
[https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md]

h2. v1

# task commits by atomic rename() of 
{{$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID}}  to the completed 
dir {{$dest/__temporary/$jobAttempt/$taskAttempt}}  ; job commit moves all data 
under each task attempt into $dest. Note that as only one rename() to that dest 
will work, race conditions in speculative work are prevented without the need 
for expliicit co-ord

# failure during task commit: delete the completed task directory (if any), 
rerun task.
# job commit: list completed tasks, move/merge them into the dest dir. Not 
atomic, rename operation count is per task & dir tree,. A failure in job commit 
is not recoverable.
# failure during job commit: job in unknown state, rm $dest and rerun

# job restart: after failure of entire job, it can be restarted, with restarted 
job using the completed tasks. Provided rename() is atomic there's a guarantee 
that every  task's completed dir is valid; provided it's O(1) its an 
inexpensvie operation

h3. v2

# task commit: copy straight to dest (this is where co-ordination is needed 
between tasks and job manager) 
# job commit: no-op
# job restart: none, start again
# failure during task commit: dest dir in unknown state, job restart needed
# failure during job commit: job in unknown state, rm $dest and rerun

Given that spark doesn't do job restart, switching to v2 everywhere reduces #of 
renames, but makes recovery from failure during task commit impossible

Neither algorithm is correct on an inconsistent S3 endpoint, as they both can 
get incorrect listings of files to COPY + DELETE. With consistency, you still 
have O(data) task commits on both algorithms, and another O(data) job commit 
with v1. Azure WASB is consistent, and uses leases for exclusive access to bits 
of the store on comnmit, but even it can do with a store-specific committer. 
Rename is not the solution to committing data in an object store.



> OutputCommitCoordinator should not allow another task to commit after an 
> ExecutorFailure
> 
>
> Key: SPARK-19790
> URL: https://issues.apache.org/jira/browse/SPARK-19790
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> The OutputCommitCoordinator resets the allowed committer when the task fails. 
>  
> https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143
> However, if a task fails because of an ExecutorFailure, we actually have no 
> idea what the status is of the task.  The task may actually still be running, 
> and perhaps successfully commit its output.  By allowing another task to 
> commit its output, there is a chance that multiple tasks commit, which can 
> result in corrupt output.  This would be particularly problematic when 
> commit() is an expensive operation, eg. moving files on S3.
> For other task failures, we can allow other tasks to commit.  But with an 
> ExecutorFailure, its not clear what the right thing to do is.  The only safe 
> thing to do may be to fail the job.
> This is related to SPARK-19631, and was discovered during discussion on that 
> PR https://github.com/apache/spark/pull/16959#discussion_r103549134



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org




[jira] [Commented] (SPARK-20703) Add an operator for writing data out

2017-07-15 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088632#comment-16088632
 ] 

Steve Loughran commented on SPARK-20703:


..got a patch for this, but want to see if I can create a unit test for it: an 
outputWriter which temporarily renamed the output file would do it, but then it 
needs to rename it back again before the next stage in the process. Or maybe 
not: that's what tests are for. Expect a PR in august

> Add an operator for writing data out
> 
>
> Key: SPARK-20703
> URL: https://issues.apache.org/jira/browse/SPARK-20703
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Liang-Chi Hsieh
> Fix For: 2.3.0
>
>
> We should add an operator for writing data out. Right now in the explain plan 
> / UI there is no way to tell whether a query is writing data out, and also 
> there is no way to associate metrics with data writes. It'd be tremendously 
> valuable to do this for adding metrics and for visibility.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20107) Add spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version option to configuration.md

2017-07-04 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16073951#comment-16073951
 ] 

Steve Loughran commented on SPARK-20107:


If you are curious, I've just written out the v1 and v2 commit algorithms with 
cost/complexity estimates: 
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md



> Add spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version option to 
> configuration.md
> ---
>
> Key: SPARK-20107
> URL: https://issues.apache.org/jira/browse/SPARK-20107
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.1.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Trivial
> Fix For: 2.2.0
>
>
> Set {{spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2}} can 
> speed up 
> [HadoopMapReduceCommitProtocol#commitJob|https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L121]
>  for many output files. 
> It can speed up {{11 minutes}} for 216869 output files:
> {code:sql}
> CREATE TABLE tmp.spark_20107 AS SELECT
>   category_id,
>   product_id,
>   track_id,
>   concat(
> substr(ds, 3, 2),
> substr(ds, 6, 2),
> substr(ds, 9, 2)
>   ) shortDate,
>   CASE WHEN actiontype = '0' THEN 'browse' WHEN actiontype = '1' THEN 'fav' 
> WHEN actiontype = '2' THEN 'cart' WHEN actiontype = '3' THEN 'order' ELSE 
> 'invalid actio' END AS type
> FROM
>   tmp.user_action
> WHERE
>   ds > date_sub('2017-01-23', 730)
> AND actiontype IN ('0','1','2','3');
> {code}
> {code}
> $ hadoop fs -ls /user/hive/warehouse/tmp.db/spark_20107 | wc -l
> 216870
> {code}
> We should add this option to 
> [configuration.md|http://spark.apache.org/docs/latest/configuration.html].
> All cloudera's hadoop 2.6.0-cdh5.4.0 or higher versions(see: 
> [cloudera/hadoop-common@1c12361|https://github.com/cloudera/hadoop-common/commit/1c1236182304d4075276c00c4592358f428bc433]
>  and 
> [cloudera/hadoop-common@16b2de2|https://github.com/cloudera/hadoop-common/commit/16b2de27321db7ce2395c08baccfdec5562017f0])
>  and apache's hadoop 2.7.0 or higher versions support this improvement.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12868) ADD JAR via sparkSQL JDBC will fail when using a HDFS URL

2017-06-29 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068301#comment-16068301
 ] 

Steve Loughran commented on SPARK-12868:


It's actually not the cause of that, merely the messenger. Cause is 
HADOOP-14383: a combination of spark 2.2 & Hadoop 2.9+ will trigger the 
problem. Fix belongs in Hadoop.

> ADD JAR via sparkSQL JDBC will fail when using a HDFS URL
> -
>
> Key: SPARK-12868
> URL: https://issues.apache.org/jira/browse/SPARK-12868
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Trystan Leftwich
>Assignee: Weiqing Yang
> Fix For: 2.2.0
>
>
> When trying to add a jar with a HDFS URI, i.E
> {code:sql}
> ADD JAR hdfs:///tmp/foo.jar
> {code}
> Via the spark sql JDBC interface it will fail with:
> {code:sql}
> java.net.MalformedURLException: unknown protocol: hdfs
> at java.net.URL.(URL.java:593)
> at java.net.URL.(URL.java:483)
> at java.net.URL.(URL.java:432)
> at java.net.URI.toURL(URI.java:1089)
> at 
> org.apache.spark.sql.hive.client.ClientWrapper.addJar(ClientWrapper.scala:578)
> at org.apache.spark.sql.hive.HiveContext.addJar(HiveContext.scala:652)
> at org.apache.spark.sql.hive.execution.AddJar.run(commands.scala:89)
> at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
> at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
> at 
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:145)
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:130)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:211)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:154)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:151)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:164)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21137) Spark reads many small files slowly

2017-06-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065423#comment-16065423
 ] 

Steve Loughran commented on SPARK-21137:


Looking at this.

something is trying to get the permissions for every file, which is being dealt 
with by an exec & all the overheads of that. Looking at the code, it's in the 
constructor of {{LocatedFileStatus}}, which is building it from another 
{{FileStatus}}. Which normally is just a simple copy of a field (fast, 
efficient). Looks like on RawLocalFileSystem, it actually triggers an on demand 
execution. Been around for a long time (HADOOP-2288), surfacing here because 
you're working with the local FS. For all other filesystems it's a quick 
operation.

I think this is an issue: I don't think anybody thought this would be a 
problem, as it's just viewed as a marshalling of a LocatedFileStatus, which is 
what you get back from {{FileSystem.listLocatedStatus}}. Normally that's the 
higher performing one, not just on object stores, but because it scales better, 
being able to incrementally send back data in batches, rather than needing to 
enumerate an entire directory of files (possibly in the millions) and then send 
them around as arrays of FileStatus.  Here, it's clearly not.

What to do? I think we could consider whether it'd be possible to add this to 
the hadoop native libs & so make a fast API call. There's also the option of 
"allowing us to completely disable permissions entirely". That one appeals to 
me more from a windows perspective, where you could get rid of the hadoop 
native lib and still have (most) things work there...but as its an incomplete 
"most" it's probably an optimistic goal.



> Spark reads many small files slowly
> ---
>
> Key: SPARK-21137
> URL: https://issues.apache.org/jira/browse/SPARK-21137
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: sam
>Priority: Minor
>
> A very common use case in big data is to read a large number of small files.  
> For example the Enron email dataset has 1,227,645 small files.
> When one tries to read this data using Spark one will hit many issues.  
> Firstly, even if the data is small (each file only say 1K) any job can take a 
> very long time (I have a simple job that has been running for 3 hours and has 
> not yet got to the point of starting any tasks, I doubt if it will ever 
> finish).
> It seems all the code in Spark that manages file listing is single threaded 
> and not well optimised.  When I hand crank the code and don't use Spark, my 
> job runs much faster.
> Is it possible that I'm missing some configuration option? It seems kinda 
> surprising to me that Spark cannot read Enron data given that it's such a 
> quintessential example.
> So it takes 1 hour to output a line "1,227,645 input paths to process", it 
> then takes another hour to output the same line. Then it outputs a CSV of all 
> the input paths (so creates a text storm).
> Now it's been stuck on the following:
> {code}
> 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo 
> library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608]
> {code}
> for 2.5 hours.
> So I've provided full reproduce steps here (including code and cluster setup) 
> https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can 
> easily just clone, and follow the README to reproduce exactly!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21137) Spark reads many small files slowly

2017-06-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065445#comment-16065445
 ] 

Steve Loughran commented on SPARK-21137:


Filed HADOOP-14600.

Looks like a v. old codepath that's not been looked at...and since then the 
native lib fstat call should be able to do this, just retain the old code for 
when {{NativeCodeLoader.isNativeCodeLoaded() == false}}. 



Sam, you said

bq. it's likely that the underlying Hadoop APIs have some yucky code that does 
something silly, I have delved down their before and my stomach cannot handle it

oh, it's not so bad. At least you don't have to go near the assembly code bit. 
That we are all scared of. Or worse, Kerberos.

Anyway, patches welcome there, with tests

> Spark reads many small files slowly
> ---
>
> Key: SPARK-21137
> URL: https://issues.apache.org/jira/browse/SPARK-21137
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: sam
>Priority: Minor
>
> A very common use case in big data is to read a large number of small files.  
> For example the Enron email dataset has 1,227,645 small files.
> When one tries to read this data using Spark one will hit many issues.  
> Firstly, even if the data is small (each file only say 1K) any job can take a 
> very long time (I have a simple job that has been running for 3 hours and has 
> not yet got to the point of starting any tasks, I doubt if it will ever 
> finish).
> It seems all the code in Spark that manages file listing is single threaded 
> and not well optimised.  When I hand crank the code and don't use Spark, my 
> job runs much faster.
> Is it possible that I'm missing some configuration option? It seems kinda 
> surprising to me that Spark cannot read Enron data given that it's such a 
> quintessential example.
> So it takes 1 hour to output a line "1,227,645 input paths to process", it 
> then takes another hour to output the same line. Then it outputs a CSV of all 
> the input paths (so creates a text storm).
> Now it's been stuck on the following:
> {code}
> 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo 
> library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608]
> {code}
> for 2.5 hours.
> So I've provided full reproduce steps here (including code and cluster setup) 
> https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can 
> easily just clone, and follow the README to reproduce exactly!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21137) Spark reads many small files slowly

2017-06-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065428#comment-16065428
 ] 

Steve Loughran commented on SPARK-21137:


ps, for now, do it in parallel: 
{{mapreduce.input.fileinputformat.list-status.num-threads}} . There though, the 
fact that every thread will be exec()ing code can make it expensive (looks like 
a full Posix spawn on a mac. 



> Spark reads many small files slowly
> ---
>
> Key: SPARK-21137
> URL: https://issues.apache.org/jira/browse/SPARK-21137
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: sam
>Priority: Minor
>
> A very common use case in big data is to read a large number of small files.  
> For example the Enron email dataset has 1,227,645 small files.
> When one tries to read this data using Spark one will hit many issues.  
> Firstly, even if the data is small (each file only say 1K) any job can take a 
> very long time (I have a simple job that has been running for 3 hours and has 
> not yet got to the point of starting any tasks, I doubt if it will ever 
> finish).
> It seems all the code in Spark that manages file listing is single threaded 
> and not well optimised.  When I hand crank the code and don't use Spark, my 
> job runs much faster.
> Is it possible that I'm missing some configuration option? It seems kinda 
> surprising to me that Spark cannot read Enron data given that it's such a 
> quintessential example.
> So it takes 1 hour to output a line "1,227,645 input paths to process", it 
> then takes another hour to output the same line. Then it outputs a CSV of all 
> the input paths (so creates a text storm).
> Now it's been stuck on the following:
> {code}
> 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo 
> library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608]
> {code}
> for 2.5 hours.
> So I've provided full reproduce steps here (including code and cluster setup) 
> https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can 
> easily just clone, and follow the README to reproduce exactly!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12868) ADD JAR via sparkSQL JDBC will fail when using a HDFS URL

2017-06-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065448#comment-16065448
 ] 

Steve Loughran commented on SPARK-12868:


I think this is the case of HADOOP-14598: once the FS has been set to 
{{FsUrlStreamHandlerFactory}} in {{org.apache.spark.sql.internal.SharedState}}, 
you can't talk to Azure. 

> ADD JAR via sparkSQL JDBC will fail when using a HDFS URL
> -
>
> Key: SPARK-12868
> URL: https://issues.apache.org/jira/browse/SPARK-12868
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Trystan Leftwich
>Assignee: Weiqing Yang
> Fix For: 2.2.0
>
>
> When trying to add a jar with a HDFS URI, i.E
> {code:sql}
> ADD JAR hdfs:///tmp/foo.jar
> {code}
> Via the spark sql JDBC interface it will fail with:
> {code:sql}
> java.net.MalformedURLException: unknown protocol: hdfs
> at java.net.URL.(URL.java:593)
> at java.net.URL.(URL.java:483)
> at java.net.URL.(URL.java:432)
> at java.net.URI.toURL(URI.java:1089)
> at 
> org.apache.spark.sql.hive.client.ClientWrapper.addJar(ClientWrapper.scala:578)
> at org.apache.spark.sql.hive.HiveContext.addJar(HiveContext.scala:652)
> at org.apache.spark.sql.hive.execution.AddJar.run(commands.scala:89)
> at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
> at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
> at 
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:145)
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:130)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:211)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:154)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:151)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:164)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21137) Spark reads many small files slowly

2017-06-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065515#comment-16065515
 ] 

Steve Loughran commented on SPARK-21137:


bq. so it is something that could be optimized in the Hadoop API, and seems 
somewhat specific to a local filesystem.

yeah, its something unique to the local FS (maybe things which extend it, like 
glusterfs —[~jayunit100] ?).  Fix would be straightforward if someone goes down 
to it & writes the test. Ideally a scale one which creates 1K+ small files and 
then measures the diff in times between listStatus as a before/after.

> Spark reads many small files slowly
> ---
>
> Key: SPARK-21137
> URL: https://issues.apache.org/jira/browse/SPARK-21137
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: sam
>Priority: Minor
>
> A very common use case in big data is to read a large number of small files.  
> For example the Enron email dataset has 1,227,645 small files.
> When one tries to read this data using Spark one will hit many issues.  
> Firstly, even if the data is small (each file only say 1K) any job can take a 
> very long time (I have a simple job that has been running for 3 hours and has 
> not yet got to the point of starting any tasks, I doubt if it will ever 
> finish).
> It seems all the code in Spark that manages file listing is single threaded 
> and not well optimised.  When I hand crank the code and don't use Spark, my 
> job runs much faster.
> Is it possible that I'm missing some configuration option? It seems kinda 
> surprising to me that Spark cannot read Enron data given that it's such a 
> quintessential example.
> So it takes 1 hour to output a line "1,227,645 input paths to process", it 
> then takes another hour to output the same line. Then it outputs a CSV of all 
> the input paths (so creates a text storm).
> Now it's been stuck on the following:
> {code}
> 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo 
> library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608]
> {code}
> for 2.5 hours.
> So I've provided full reproduce steps here (including code and cluster setup) 
> https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can 
> easily just clone, and follow the README to reproduce exactly!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7481) Add spark-hadoop-cloud module to pull in object store support

2017-04-26 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984771#comment-15984771
 ] 

Steve Loughran commented on SPARK-7481:
---

I think we ended up going in circles on that PR. Sean has actually been very 
tolerant of me, however it's been hampered by my full time focus on other 
thingsr. I've only been had time to work on the spark PR intermittently and 
that's been hard for all: me in the rebase/retest, the one reviewer in having 
to catch up again.

Now, anyone who does manage to get that CP right will discover that S3A 
absolutely flies with Spark, in partitioning (list file improvements), data 
input (set fadvise=true for ORC and Parquet), and for output (set 
fast.output=true, play with the pool options). It delivers that performance 
because this patch set things up for the integration tests, downstream of this 
patch so I and others can be confident that the things actually work, at sped, 
at scale. Indeed, many of S3A performance work was actually based on Hive and 
Spark workloads:, the data formats & their seek patterns, directory layouts, 
file generation. All that's left is the little problem of getting the classpath 
right. Oh, and the committer.


For now, for people's enjoyment, here's some videos from Spark Summit East on 
the topic

* [Spark and object stores|https://youtu.be/8F2Jqw5_OnI]. 
* [Robust and Scalable etl over Cloud Storage With 
Spark|https://spark-summit.org/east-2017/events/robust-and-scalable-etl-over-cloud-storage-with-spark/]



> Add spark-hadoop-cloud module to pull in object store support
> -
>
> Key: SPARK-7481
> URL: https://issues.apache.org/jira/browse/SPARK-7481
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.1.0
>Reporter: Steve Loughran
>
> To keep the s3n classpath right, to add s3a, swift & azure, the dependencies 
> of spark in a 2.6+ profile need to add the relevant object store packages 
> (hadoop-aws, hadoop-openstack, hadoop-azure)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7481) Add spark-hadoop-cloud module to pull in object store support

2017-04-26 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985040#comment-15985040
 ] 

Steve Loughran commented on SPARK-7481:
---

(This is a fairly long comment, but it tries to summarise the entire state of 
interaction with object stores, esp. S3A on Hadoop 2.8+. Azure is simpler, GCS: 
google's problem. Swift. not used very much).

If you look at object store & Spark (or indeed, any code which uses a 
filesystem as the source and dest of work), there are problems which can 
generally be grouped into various categories.

h3. Foundational: talking to the object stores

classpath & execution: can you wire the JARs up? Longstanding issue in ASF 
Spark releases (SPARK-5348, SPARK-12557). This was exacerbated by the movement 
of S3n:// to the hadoop-aws-package (FWIW, I hadn't noticed that move, I'd have 
blocked it if I'd been paying attention). This includes transitive problems 
(SPARK-11413)

Credential propagation. Spark's env var propagation is pretty cute here; 
SPARK-19739 picks up {{AWS_SESSION_TOKEN}} too. Diagnostics on failure is a 
real pain.


h3. Observable Inconsistencies leading to Data loss

Generally where the metaphor "it's just a filesystem" fail. These are bad 
because they often "just work", especially in dev & Test with small datasets, 
and when they go wrong, they can fail by generating bad results *and nobody 
notices*.

* Expectations of consistent listing of "directories" S3Guard deals with this, 
HADOOP-13345, as can Netflix's S3mper and AWS's premium Dynamo backed S3 
storage.
* Expectations on the transacted nature of Directory renames, the core atomic 
commit operations against full filesystems.
* Expectations that when things are deleted they go away. This does become 
visible sometimes, usually in checks for a destination not existing 
(SPARK-19013)
* Expectations that write-in-progress data is visible/flushed, that {{close()}} 
is low cost. SPARK-19111.

Committing pretty much combines all of these, see below for more details.

h3. Aggressively bad performance

That's the mismatch between what the object store offers, what the apps expect, 
and the metaphor work in the Hadooop FileSystem implementations, which, in 
trying to hide the conceptual mismatch can actually amplify the problem. 
Example: Directory tree scanning at the start of a query. The mock directory 
structure allows callers to do treewalks, when really a full list of all 
children can be done as a direct O(1) call. SPARK-17159 covers some of this for 
scanning directories in Spark Streaming, but there's a hidden tree walk in 
every call to {{FileSystem.globStatus()}} (HADOOP-13371). Given how S3Guard 
transforms this treewalk, and you need it for consistency, that's probably the 
best solution for now. Although I have a PoC which does a full List **/* 
followed by a filter, that's not viable when you have a wide deep tree and do 
need to prune aggressively.

Checkpointing to object stores is similar: it's generally not dangerous to do 
the write+rename, just adds the copy overhead, consistency issues 
notwithstanding.


h3. Suboptimal code. 

There's opportunities for speedup, but if it's not on the critical path, not 
worth the hassle. That said, as every call to {{getFileStatus()}} can take 
hundreds of millis, they get onto the critical path quite fast. Example checks 
for a file existing before calling {{fs.delete(path)}} (this is always a no-op 
if the dest path isn't there), and the equivalent on mkdirs: {{if 
(!fs.exists(dir) fs.mkdirs(path)}}. Hadoop 3.0 will help steer people on the 
path of righteousness there by deprecating a couple of methods which encourage 
inefficiencies (isFile/isDir).


h3. The commit problem 

The full commit problem combines all of these: you need a consistent list of 
source data, your deleted destination path musn't appear in listings, the 
commit of each task must promote a task's work to the pending output of the 
job; an abort must leave no trace of it. The final job commit must place data 
into the final destination, again, job abort not make any output visible. 
There's some ambiguity about what happens if task and job commits fails; 
generally the safest is "abort everything". Futhermore nobody has any idea what 
to do if an {{abort()}} raises exceptions. Oh, and all of this must be fast. 
Spark is no better or worse than the core MapReduce committers here, or that of 
Hive.

Spark generally uses the Hadoop {{FileOutputFormat}} via the 
{{HadoopMapReduceCommitProtocol}}, directly or indirectly (e.g 
{{ParquetOutputFormat}}), extracting its committer and casting it to 
{{FileOutputCommitter}}, primarily to get a working directory. This committer 
assumes the destination is a consistent FS, uses renames when promoting task 
and job output, assuming that is so fast it doesn't even bother to log a 
message "about to rename". Hence the recurrent Stack Overflow 

[jira] [Commented] (SPARK-17159) Improve FileInputDStream.findNewFiles list performance

2017-04-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981081#comment-15981081
 ] 

Steve Loughran commented on SPARK-17159:


pulled out documentation into separate JIRA, SPARK-20448, for independent review

> Improve FileInputDStream.findNewFiles list performance
> --
>
> Key: SPARK-17159
> URL: https://issues.apache.org/jira/browse/SPARK-17159
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0
> Environment: spark against object stores
>Reporter: Steve Loughran
>Priority: Minor
>
> {{FileInputDStream.findNewFiles()}} is doing a globStatus with a fitler that 
> calls getFileStatus() on every file, takes the output and does listStatus() 
> on the output.
> This going to suffer on object stores, as dir listing and getFileStatus calls 
> are so expensive. It's clear this is a problem, as the method has code to 
> detect timeouts in the window and warn of problems.
> It should be possible to make this faster



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20448) Document how FileInputDStream works with object storage

2017-04-24 Thread Steve Loughran (JIRA)
Steve Loughran created SPARK-20448:
--

 Summary: Document how FileInputDStream works with object storage
 Key: SPARK-20448
 URL: https://issues.apache.org/jira/browse/SPARK-20448
 Project: Spark
  Issue Type: Documentation
  Components: Documentation
Affects Versions: 2.1.0
Reporter: Steve Loughran
Priority: Minor


Object stores work differently from filesystems: intermediate writes not 
visible, renames are really O(data) copies, not O(1) transactions.

This makes working with them as DStreams fundamentally different: you can write 
straight into the destination.

1. Document how FileinputDStreams scan directories for changes
2. Document how object stores behave differently, and the implications
for users.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21374) Reading globbed paths from S3 into DF doesn't work if filesystem caching is disabled

2017-08-01 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108897#comment-16108897
 ] 

Steve Loughran commented on SPARK-21374:


I understand...the patch shows the issue. Its only working in some codepaths 
because the (authenticated) S3 FS instance was already created and cached.

> Reading globbed paths from S3 into DF doesn't work if filesystem caching is 
> disabled
> 
>
> Key: SPARK-21374
> URL: https://issues.apache.org/jira/browse/SPARK-21374
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.1
>Reporter: Andrey Taptunov
>
> *Motivation:*
> In my case I want to disable filesystem cache to be able to change S3's 
> access key and secret key on the fly to read from buckets with different 
> permissions. This works perfectly fine for RDDs but doesn't work for DFs.
> *Example (works for RDD but fails for DataFrame):*
> {code:java}
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
> object SimpleApp {
>   def main(args: Array[String]) {
> val awsAccessKeyId = "something"
> val awsSecretKey = "something else"
> val conf = new SparkConf().setAppName("Simple 
> Application").setMaster("local[*]")
> val sc = new SparkContext(conf)
> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", awsAccessKeyId)
> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", awsSecretKey)
> sc.hadoopConfiguration.setBoolean("fs.s3.impl.disable.cache",true)
> 
> sc.hadoopConfiguration.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> sc.hadoopConfiguration.set("fs.s3.buffer.dir","/tmp")
> val spark = SparkSession.builder().config(conf).getOrCreate()
> val rddFile = sc.textFile("s3://bucket/file.csv").count // ok
> val rddGlob = sc.textFile("s3://bucket/*").count // ok
> val dfFile = spark.read.format("csv").load("s3://bucket/file.csv").count 
> // ok
> 
> val dfGlob = spark.read.format("csv").load("s3://bucket/*").count 
> // IllegalArgumentExcepton. AWS Access Key ID and Secret Access Key must 
> be specified as the username or password (respectively)
> // of a s3 URL, or by setting the fs.s3.awsAccessKeyId or 
> fs.s3.awsSecretAccessKey properties (respectively).
>
> sc.stop()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21514) Hive has updated with new support for S3 and InsertIntoHiveTable.scala should update also

2017-08-01 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108882#comment-16108882
 ] 

Steve Loughran commented on SPARK-21514:


Can you link this JIRA to the specific HIVE work?

> Hive has updated with new support for S3 and InsertIntoHiveTable.scala should 
> update also
> -
>
> Key: SPARK-21514
> URL: https://issues.apache.org/jira/browse/SPARK-21514
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Javier Ros
>
> Hive has updated adding new parameters to optimize the usage of S3, now you 
> can avoid the usage of S3 as the stagingdir using the parameters 
> hive.blobstore.supported.schemes & hive.blobstore.optimizations.enabled.
> The InsertIntoHiveTable.scala file should be updated with the same 
> improvement to match the behavior of Hive.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21618) http(s) not accepted in spark-submit jar uri

2017-08-04 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114379#comment-16114379
 ] 

Steve Loughran commented on SPARK-21618:


yes, and that 2.9+ feature breaks things, because when you ask for an http or 
https connection, you get back some Hadoop wrapper class, which is not what 
other code (e.g. Wasb) wants ... their attempts to cast it to the normal 
java,io base fails. This doesn't surface on any shipping Hadoop release (or 
HDP/CDH/EMR) & a fix is in progress.

> http(s) not accepted in spark-submit jar uri
> 
>
> Key: SPARK-21618
> URL: https://issues.apache.org/jira/browse/SPARK-21618
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy
>Affects Versions: 2.1.1, 2.2.0
> Environment: pre-built for hadoop 2.6 and 2.7 on mac and ubuntu 
> 16.04. 
>Reporter: Ben Mayne
>Priority: Minor
>  Labels: documentation
>
> The documentation suggests I should be able to use an http(s) uri for a jar 
> in spark-submit, but I haven't been successful 
> https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
> {noformat}
> benmayne@Benjamins-MacBook-Pro ~ $ spark-submit --deploy-mode client --master 
> local[2] --class class.name.Test https://test.com/path/to/jar.jar
> log4j:WARN No appenders could be found for logger 
> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> Exception in thread "main" java.io.IOException: No FileSystem for scheme: 
> https
>   at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
>   at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
>   at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>   at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
>   at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
>   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>   at 
> org.apache.spark.deploy.SparkSubmit$.downloadFile(SparkSubmit.scala:865)
>   at 
> org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$1.apply(SparkSubmit.scala:316)
>   at 
> org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$1.apply(SparkSubmit.scala:316)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:316)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> benmayne@Benjamins-MacBook-Pro ~ $
> {noformat}
> If I replace the path with a valid hdfs path 
> (hdfs:///user/benmayne/valid-jar.jar), it works as expected. I've seen the 
> same behavior across 2.2.0 (hadoop 2.6 & 2.7 on mac and ubuntu) and on 2.1.1 
> on ubuntu. 
> this is the example that I'm trying to replicate from 
> https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management:
>  
> > Spark uses the following URL scheme to allow different strategies for 
> > disseminating jars:
> > file: - Absolute paths and file:/ URIs are served by the driver’s HTTP file 
> > server, and every executor pulls the file from the driver HTTP server.
> > hdfs:, http:, https:, ftp: - these pull down files and JARs from the URI as 
> > expected
> {noformat}
> # Run on a Mesos cluster in cluster deploy mode with supervise
> ./bin/spark-submit \
>   --class org.apache.spark.examples.SparkPi \
>   --master mesos://207.184.161.138:7077 \
>   --deploy-mode cluster \
>   --supervise \
>   --executor-memory 20G \
>   --total-executor-cores 100 \
>   http://path/to/examples.jar \
>   1000
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21762) FileFormatWriter metrics collection fails if a newly close()d file isn't yet visible

2017-08-17 Thread Steve Loughran (JIRA)
Steve Loughran created SPARK-21762:
--

 Summary: FileFormatWriter metrics collection fails if a newly 
close()d file isn't yet visible
 Key: SPARK-21762
 URL: https://issues.apache.org/jira/browse/SPARK-21762
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
 Environment: object stores without complete creation consistency (this 
includes AWS S3's caching of negative GET results)
Reporter: Steve Loughran
Priority: Minor


The metrics collection of SPARK-20703 can trigger premature failure if the 
newly written object isn't actually visible yet, that is if, after 
{{writer.close()}}, a {{getFileStatus(path)}} returns a 
{{FileNotFoundException}}.

Strictly speaking, not having a file immediately visible goes against the 
fundamental expectations of the Hadoop FS APIs, namely full consistent data & 
medata across all operations, with immediate global visibility of all changes. 
However, not all object stores make that guarantee, be it only newly created 
data or updated blobs. And so spurious FNFEs can get raised, ones which 
*should* have gone away by the time the actual task is committed. Or if they 
haven't, the job is in such deep trouble.

What to do?
# leave as is: fail fast & so catch blobstores/blobstore clients which don't 
behave as required. One issue here: will that trigger retries, what happens 
there, etc, etc.
# Swallow the FNFE and hope the file is observable later.
# Swallow all IOEs and hope that whatever problem the FS has is transient.

Options 2 & 3 aren't going to collect metrics in the event of a FNFE, or at 
least, not the counter of bytes written.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20370) create external table on read only location fails

2017-05-02 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993405#comment-15993405
 ] 

Steve Loughran commented on SPARK-20370:


Is this the bit under the PR tagged "!! HACK ALERT !!" by any chance? If so, it 
seems to have gone in for a Hive metastore workaround. I wonder if there is/can 
be a solution in Hive-land.

> create external table on read only location fails
> -
>
> Key: SPARK-20370
> URL: https://issues.apache.org/jira/browse/SPARK-20370
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.0
> Environment: EMR 5.4.0, hadoop 2.7.3, spark 2.1.0
>Reporter: Gaurav Shah
>Priority: Minor
>
> Create External table via following fails:
>  sqlContext.createExternalTable(
>  "table_name",
>  "org.apache.spark.sql.parquet",
>  inputSchema,
>  Map(
>"path" -> "s3a://bucket-name/folder",
>"mergeSchema" -> "false"
>  )
>)
> Spark in the following commit tries to check if it has write access to giving 
> location, which fails and so the table meta creation fails. 
> https://github.com/apache/spark/pull/13270/files
> The table creation script works even if cluster has read only access in spark 
> 1.6, but fails in spark 2.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20560) Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations

2017-05-02 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993323#comment-15993323
 ] 

Steve Loughran commented on SPARK-20560:


To follow this up, I've now got a test which verifies that (a) s3a returns 
"localhost" and (b) spark discards it. This'll catch any regressions in the s3a 
client.
{code}
val source = CSV_TESTFILE.get
val fs = getFilesystem(source)
val blockLocations = fs.getFileBlockLocations(source, 0, 1)
assert(1 === blockLocations.length,
  s"block location array size wrong: ${blockLocations}")
val hosts = blockLocations(0).getHosts
assert(1 === hosts.length, s"wrong host size ${hosts}")
assert("localhost" === hosts(0), "hostname")

val path = source.toString
val rdd = sc.hadoopFile[LongWritable, Text, TextInputFormat](path, 1)
val input = rdd.asInstanceOf[HadoopRDD[_, _]]
val partitions = input.getPartitions
val locations = input.getPreferredLocations(partitions.head)
assert(locations.isEmpty, s"Location list not empty ${locations}")
{code}

> Review Spark's handling of filesystems returning "localhost" in 
> getFileBlockLocations
> -
>
> Key: SPARK-20560
> URL: https://issues.apache.org/jira/browse/SPARK-20560
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Steve Loughran
>Priority: Minor
>
> Some filesystems (S3a, Azure WASB) return "localhost" as the response to 
> {{FileSystem.getFileBlockLocations(path)}}. If this is then used as the 
> preferred host when scheduling work, there's a risk that work will be queued 
> on one host, rather than spread across the cluster.
> HIVE-14060 and TEZ-3291 have both seen it in their schedulers.
> I don't know if Spark does it, someone needs to look at the code, maybe write 
> some tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20608) Standby namenodes should be allowed to included in yarn.spark.access.namenodes to support HDFS HA

2017-05-11 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006454#comment-16006454
 ] 

Steve Loughran commented on SPARK-20608:


One thing to consider here is starting with a test to see what happens, 
https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
 shows to to set up MiniDFSCluster to run HA

But yes, Marcelo is right: you don't go hdfs://hostname1:8000/ you do 
{{hdfs://hacluster:8000/}} and rely on the list of hosts in 
{{dfs.ha.namenodes.hacluster}} to tell HDFS who to try to talk to. I suspect 
it's important to have a cluster name which is not a resolvable hostname, as 
that may confuse the client (just a guess; haven't looked at the source to see)

> Standby namenodes should be allowed to included in 
> yarn.spark.access.namenodes to support HDFS HA
> -
>
> Key: SPARK-20608
> URL: https://issues.apache.org/jira/browse/SPARK-20608
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Submit, YARN
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Yuechen Chen
>Priority: Minor
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> If one Spark Application need to access remote namenodes, 
> yarn.spark.access.namenodes should be only be configged in spark-submit 
> scripts, and Spark Client(On Yarn) would fetch HDFS credential periodically.
> If one hadoop cluster is configured by HA, there would be one active namenode 
> and at least one standby namenode. 
> However, if yarn.spark.access.namenodes includes both active and standby 
> namenodes, Spark Application will be failed for the reason that the standby 
> namenode would not access by Spark for org.apache.hadoop.ipc.StandbyException.
> I think it won't cause any bad effect to config standby namenodes in 
> yarn.spark.access.namenodes, and my Spark Application can be able to sustain 
> the failover of Hadoop namenode.
> HA Examples:
> Spark-submit script: 
> yarn.spark.access.namenodes=hdfs://namenode01,hdfs://namenode02
> Spark Application Codes:
> dataframe.write.parquet(getActiveNameNode(...) + hdfsPath)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21074) Parquet files are read fully even though only count() is requested

2017-06-20 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056309#comment-16056309
 ] 

Steve Loughran commented on SPARK-21074:


Given this is an s3 URL, it may be amplifying the problem, with issues such as 
HADOOP-11570 and HADOOP-12376; Http 1.1 clients optimising prematurely for 
connection re-use by reading all the way to the end of the stream. an open(0) 
and a seek(end - a bit) would cause them to read all the way through the file 
at first, then re-issue a new GET.

Assuming this is the amazon EMR s3 client, I can't say if it has the same 
issue. 

# Test with local files to see if the problem goes away. If it doesn't, problem 
in parquet
# move to Hadoop 2.7+ Jars and s3a:// URLs. Better yet, drop in the 2.8 
binaries & matching AWS SDK to get the high-performance-seek code, including 
metrics you get to see when you call toString() on the FS instance.

> Parquet files are read fully even though only count() is requested
> --
>
> Key: SPARK-21074
> URL: https://issues.apache.org/jira/browse/SPARK-21074
> Project: Spark
>  Issue Type: Improvement
>  Components: Optimizer
>Affects Versions: 2.1.0
>Reporter: Michael Spector
>
> I have the following sample code that creates parquet files:
> {code:java}
> val spark = SparkSession.builder()
>   .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 
> "2")
>   .config("spark.hadoop.parquet.metadata.read.parallelism", "50")
>   .appName("Test Write").getOrCreate()
> val sqc = spark.sqlContext
> import sqc.implicits._
> val random = new scala.util.Random(31L)
> (1465720077 to 1465720077+1000).map(x => Event(x, random.nextString(2)))
>   .toDS()
>   .write
>   .mode(SaveMode.Overwrite)
>   .parquet("s3://my-bucket/test")
> {code}
> Afterwards, I'm trying to read these files with the following code:
> {code:java}
> val spark = SparkSession.builder()
>   .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 
> "2")
>   .config("spark.hadoop.parquet.metadata.read.parallelism", "50")
>   .config("spark.sql.parquet.filterPushdown", "true")
>   .appName("Test Read").getOrCreate()
> spark.sqlContext.read
>   .option("mergeSchema", "false")
>   .parquet("s3://my-bucket/test")
>   .count()
> {code}
> I've enabled DEBUG log level to see what requests are actually sent through 
> S3 API, and I've figured out that in addition to parquet "footer" retrieval 
> there are requests that ask for whole file content.
> For example, this is full content request example:
> {noformat}
> 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET 
> /test/part-0-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet 
> HTTP/1.1[\r][\n]"
> 
> 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 
> 0-7472093/7472094[\r][\n]"
> 
> 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 
> 7472094[\r][\n]"
> {noformat}
> And this is partial request example for footer only:
> {noformat}
> 17/06/13 05:46:50 DEBUG headers: http-outgoing-2 >> GET 
> /test/part-0-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1
> 
> 17/06/13 05:46:50 DEBUG headers: http-outgoing-2 >> Range: 
> bytes=7472086-7472094
> ...
> 17/06/13 05:46:50 DEBUG wire: http-outgoing-2 << "Content-Length: 8[\r][\n]"
> 
> {noformat}
> Here's what FileScanRDD prints:
> {noformat}
> 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: 
> s3://my-bucket/test/part-4-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet,
>  range: 0-7473020, partition values: [empty row]
> 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: 
> s3://my-bucket/test/part-00011-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet,
>  range: 0-7472503, partition values: [empty row]
> 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: 
> s3://my-bucket/test/part-6-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet,
>  range: 0-7472501, partition values: [empty row]
> 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: 
> s3://my-bucket/test/part-7-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet,
>  range: 0-7473104, partition values: [empty row]
> 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: 
> s3://my-bucket/test/part-3-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet,
>  range: 0-7472458, partition values: [empty row]
> 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: 
> s3://my-bucket/test/part-00012-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet,
>  range: 0-7472594, partition values: [empty row]
> 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: 
> s3://my-bucket/test/part-1-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet,
>  range: 0-7472984, partition values: [empty row]
> 17/06/13 05:46:52 INFO 

[jira] [Commented] (SPARK-19111) S3 Mesos history upload fails silently if too large

2017-06-20 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056516#comment-16056516
 ] 

Steve Loughran commented on SPARK-19111:


Followup: [~drcrallen]; Hadoop 2.8 is out the door with the JARs you need for 
the S3a block output. Curious why it's blocking the history server though. 
SPARK-11373 adds metrics to the SHS, which may help

> S3 Mesos history upload fails silently if too large
> ---
>
> Key: SPARK-19111
> URL: https://issues.apache.org/jira/browse/SPARK-19111
> Project: Spark
>  Issue Type: Bug
>  Components: EC2, Mesos, Spark Core
>Affects Versions: 2.0.0
>Reporter: Charles Allen
>
> {code}
> 2017-01-06T21:32:32,928 INFO [main] org.apache.spark.ui.SparkUI - Stopped 
> Spark web UI at http://REDACTED:4041
> 2017-01-06T21:32:32,938 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.jvmGCTime
> 2017-01-06T21:32:32,939 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.shuffle.read.localBlocksFetched
> 2017-01-06T21:32:32,939 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.resultSerializationTime
> 2017-01-06T21:32:32,939 ERROR [heartbeat-receiver-event-loop-thread] 
> org.apache.spark.scheduler.LiveListenerBus - SparkListenerBus has already 
> stopped! Dropping event SparkListenerExecutorMetricsUpdate(
> 364,WrappedArray())
> 2017-01-06T21:32:32,939 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.resultSize
> 2017-01-06T21:32:32,939 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.peakExecutionMemory
> 2017-01-06T21:32:32,939 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.shuffle.read.fetchWaitTime
> 2017-01-06T21:32:32,939 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.memoryBytesSpilled
> 2017-01-06T21:32:32,940 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.shuffle.read.remoteBytesRead
> 2017-01-06T21:32:32,940 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.diskBytesSpilled
> 2017-01-06T21:32:32,940 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.shuffle.read.localBytesRead
> 2017-01-06T21:32:32,940 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.shuffle.read.recordsRead
> 2017-01-06T21:32:32,940 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.executorDeserializeTime
> 2017-01-06T21:32:32,940 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: output/bytes
> 2017-01-06T21:32:32,941 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.executorRunTime
> 2017-01-06T21:32:32,941 INFO [SparkListenerBus] 
> com.metamx.starfire.spark.SparkDriver - emitting metric: 
> internal.metrics.shuffle.read.remoteBlocksFetched
> 2017-01-06T21:32:32,943 INFO [main] 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem - OutputStream for key 
> 'eventLogs/remnant/46bf8f87-6de6-4da8-9cba-5b2fecd0875e-1387.inprogress' 
> closed. Now beginning upload
> 2017-01-06T21:32:32,963 ERROR [heartbeat-receiver-event-loop-thread] 
> org.apache.spark.scheduler.LiveListenerBus - SparkListenerBus has already 
> stopped! Dropping event SparkListenerExecutorMetricsUpdate(905,WrappedArray())
> 2017-01-06T21:32:32,973 ERROR [heartbeat-receiver-event-loop-thread] 
> org.apache.spark.scheduler.LiveListenerBus - SparkListenerBus has already 
> stopped! Dropping event SparkListenerExecutorMetricsUpdate(519,WrappedArray())
> 2017-01-06T21:32:32,988 ERROR [heartbeat-receiver-event-loop-thread] 
> org.apache.spark.scheduler.LiveListenerBus - SparkListenerBus has already 
> stopped! Dropping event SparkListenerExecutorMetricsUpdate(596,WrappedArray())
> {code}
> Running spark on mesos, some large jobs fail to upload to the history server 
> storage!
> A successful sequence of events in the log that yield an upload are as 
> follows:
> {code}
> 2017-01-06T19:14:32,925 INFO [main] 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem - OutputStream for key 
> 'eventLogs/remnant/46bf8f87-6de6-4da8-9cba-5b2fecd0875e-1434.inprogress' 
> writing to tempfile '/mnt/tmp/hadoop/output-2516573909248961808.tmp'
> 2017-01-06T21:59:14,789 INFO [main] 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem - OutputStream for key 
> 

[jira] [Commented] (SPARK-11373) Add metrics to the History Server and providers

2017-06-22 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16059141#comment-16059141
 ] 

Steve Loughran commented on SPARK-11373:


metrics might help with understanding the s3 load issues in SPARK-19111

> Add metrics to the History Server and providers
> ---
>
> Key: SPARK-11373
> URL: https://issues.apache.org/jira/browse/SPARK-11373
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Steve Loughran
>
> The History server doesn't publish metrics about JVM load or anything from 
> the history provider plugins. This means that performance problems from 
> massive job histories aren't visible to management tools, and nor are any 
> provider-generated metrics such as time to load histories, failed history 
> loads, the number of connectivity failures talking to remote services, etc.
> If the history server set up a metrics registry and offered the option to 
> publish its metrics, then management tools could view this data.
> # the metrics registry would need to be passed down to the instantiated 
> {{ApplicationHistoryProvider}}, in order for it to register its metrics.
> # if the codahale metrics servlet were registered under a path such as 
> {{/metrics}}, the values would be visible as HTML and JSON, without the need 
> for management tools.
> # Integration tests could also retrieve the JSON-formatted data and use it as 
> part of the test suites.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20799) Unable to infer schema for ORC on reading ORC from S3

2017-05-19 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017258#comment-16017258
 ] 

Steve Loughran commented on SPARK-20799:


bq. Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login 
details. This is insecure and may be unsupported in future., but this should 
not mean that it shouldn't work anymore.

It probably will stop working at some point in the future as putting secrets in 
the URIs is too dangerous: everything logs them assuming they aren't sensitive 
data. the {{S3xLoginHelper}} not only warns you, it does a best-effort attempt 
to strip out the secrets from the public URI, hence the logs and the messages 
telling you off.

Prior to Hadoop 2.8, the sole *defensible* use case of secrets in URIs was it 
was the only way to have different logins on different buckets. In Hadoop 2.8 
we added the ability to configure any of the fs.s3a. options on a per-bucket 
basis, including the secret logins, endpoints, and other important values

I see what may be happening; in which case it probably constitutes a hadoop 
regression: if the filesystem's URI is converted to a string it will have these 
stripped, so if something is going path -> URI -> String ->path the secrets 
will be lost.

If you are seeing this stack trace, it means you are using Hadoop 2.8 or 
something else with the HADOOP-3733 patch in it. What version of Hadoop (or 
HDP, CDH..) are you using? If it is based on the full Apache 2.8 release, you 
get 

# per-bucket config to allow you to [configure each bucket 
separately|http://hadoop.apache.org/docs/r2.8.0/hadoop-aws/tools/hadoop-aws/index.html#Configurations_different_S3_buckets]
# the ability to use JCEKS files to keep the secrets out the configs
# session token support.

Accordingly, if you state the version, I may be able to look @ what's happening 
in a bit more detail


> Unable to infer schema for ORC on reading ORC from S3
> -
>
> Key: SPARK-20799
> URL: https://issues.apache.org/jira/browse/SPARK-20799
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: Jork Zijlstra
>
> We are getting the following exception: 
> {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. 
> It must be specified manually.{code}
> Combining following factors will cause it:
> - Use S3
> - Use format ORC
> - Don't apply a partitioning on de data
> - Embed AWS credentials in the path
> The problem is in the PartitioningAwareFileIndex def allFiles()
> {code}
> leafDirToChildrenFiles.get(qualifiedPath)
>   .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
>   .getOrElse(Array.empty)
> {code}
> leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the 
> qualifiedPath contains the path WITH credentials.
> So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no 
> data is read and the schema cannot be defined.
> Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login 
> details. This is insecure and may be unsupported in future., but this should 
> not mean that it shouldn't work anymore.
> Workaround:
> Move the AWS credentials from the path to the SparkSession
> {code}
> SparkSession.builder
>   .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId})
>   .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey})
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20799) Unable to infer schema for ORC on reading ORC from S3

2017-05-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022807#comment-16022807
 ] 

Steve Loughran commented on SPARK-20799:


If what I think is happening is, then it's the security tightening of 
HADOOP-3733 which has stopped this. It is sort-of-a-regression, but as it has a 
security benefit "stops leaking your secrets through logs" Its not something we 
want to revert. Anyway, it *never* worked if you had a "/" in your secret key, 
so the sole reason it worked for you in the past is that you don't (see: I know 
something about your secret credentials:)

Hadoop 2.8 is way better for S3A support all round, so I'd encourage you to 
stay and play. In particular,
# switch from s3n:// to s3a:// for your URLs, to get the new high-performance 
client
# try setting {{fs.s3a.experimental.fadvise=random}} in your settings and you 
should expect to see a significant speedup in ORC input.

If the use case here is that you want to use separate credentials for a 
specific bucket, you can use per-bucket config now
{code}
fs.s3a.bucket.site-2.access.key=my access key
fs.s3a.bucket.site-2.access.secret=my access secret
{code}

then when you refer to {{s3a://site-2/path}} ,  the specific key & secret for 
that bucket are picked up. This is why you shouldn't need to use inline secrets 
at all


> Unable to infer schema for ORC on reading ORC from S3
> -
>
> Key: SPARK-20799
> URL: https://issues.apache.org/jira/browse/SPARK-20799
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Jork Zijlstra
>
> We are getting the following exception: 
> {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. 
> It must be specified manually.{code}
> Combining following factors will cause it:
> - Use S3
> - Use format ORC
> - Don't apply a partitioning on de data
> - Embed AWS credentials in the path
> The problem is in the PartitioningAwareFileIndex def allFiles()
> {code}
> leafDirToChildrenFiles.get(qualifiedPath)
>   .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
>   .getOrElse(Array.empty)
> {code}
> leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the 
> qualifiedPath contains the path WITH credentials.
> So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no 
> data is read and the schema cannot be defined.
> Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login 
> details. This is insecure and may be unsupported in future., but this should 
> not mean that it shouldn't work anymore.
> Workaround:
> Move the AWS credentials from the path to the SparkSession
> {code}
> SparkSession.builder
>   .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId})
>   .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey})
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20799) Unable to infer schema for ORC on S3N when secrets are in the URL

2017-05-24 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-20799:
---
Summary: Unable to infer schema for ORC on S3N when secrets are in the URL  
(was: Unable to infer schema for ORC on reading ORC from S3)

> Unable to infer schema for ORC on S3N when secrets are in the URL
> -
>
> Key: SPARK-20799
> URL: https://issues.apache.org/jira/browse/SPARK-20799
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Jork Zijlstra
>
> We are getting the following exception: 
> {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. 
> It must be specified manually.{code}
> Combining following factors will cause it:
> - Use S3
> - Use format ORC
> - Don't apply a partitioning on de data
> - Embed AWS credentials in the path
> The problem is in the PartitioningAwareFileIndex def allFiles()
> {code}
> leafDirToChildrenFiles.get(qualifiedPath)
>   .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
>   .getOrElse(Array.empty)
> {code}
> leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the 
> qualifiedPath contains the path WITH credentials.
> So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no 
> data is read and the schema cannot be defined.
> Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login 
> details. This is insecure and may be unsupported in future., but this should 
> not mean that it shouldn't work anymore.
> Workaround:
> Move the AWS credentials from the path to the SparkSession
> {code}
> SparkSession.builder
>   .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId})
>   .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey})
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19669) Open up visibility for sharedState, sessionState, and a few other functions

2017-05-26 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16026219#comment-16026219
 ] 

Steve Loughran commented on SPARK-19669:


thanks for this, very nice to have Logging usable outside Spark's own codebase 
again; I had my own copy & paste but when you start playing with traits across 
things, life gets hard

> Open up visibility for sharedState, sessionState, and a few other functions
> ---
>
> Key: SPARK-19669
> URL: https://issues.apache.org/jira/browse/SPARK-19669
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Fix For: 2.2.0
>
>
> To ease debugging, most of Spark SQL internals have public level visibility. 
> Two of the most important internal states, sharedState and sessionState, 
> however, are package private. It would make more sense to open these up as 
> well with clear documentation that they are internal.
> In addition, users currently have way to set active/default SparkSession, but 
> no way to actually get them back. We should open those up as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8578) Should ignore user defined output committer when appending data

2017-05-25 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024526#comment-16024526
 ] 

Steve Loughran commented on SPARK-8578:
---

Given SPARK-10063 has pulled the {{DirectParquetOutputCommitter}} on account of 
the incompatibility with 'direct' output and 'commit protocol', it may be time 
to revisit this.

> Should ignore user defined output committer when appending data
> ---
>
> Key: SPARK-8578
> URL: https://issues.apache.org/jira/browse/SPARK-8578
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.0
>Reporter: Cheng Lian
>Assignee: Yin Huai
> Fix For: 1.4.1, 1.5.0
>
>
> When appending data to a file system via Hadoop API, it's safer to ignore 
> user defined output committer classes like {{DirectParquetOutputCommitter}}. 
> Because it's relatively hard to handle task failure in this case.  For 
> example, {{DirectParquetOutputCommitter}} directly writes to the output 
> directory to boost write performance when working with S3. However, there's 
> no general way to determine task output file path of a specific task in 
> Hadoop API, thus we don't know to revert a failed append job. (When doing 
> overwrite, we can just remove the whole output directory.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null

2017-05-25 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024886#comment-16024886
 ] 

Steve Loughran commented on SPARK-20886:


Stack trace: after
{code}
2017-05-25 16:22:10,807 [dag-scheduler-event-loop] INFO  scheduler.DAGScheduler 
(Logging.scala:logInfo(54)) - ResultStage 2 (apply at Transformer.scala:22) 
failed in 0.065 s due to Job aborted due to stage failure: Task 0 in stage 2.0 
failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, 
localhost, executor driver): org.apache.spark.SparkException: Task failed while 
writing rows
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:263)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:182)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:181)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: Committer 
has no workpath 
FileOutputCommitter{outputPath=file:/Users/stevel/Projects/sparkwork/cloud-integration/cloud-examples/target/tmp/spark-41b05e5f-93eb-4b1b-8e9d-7fd930641267,
 workPath=null, algorithmVersion=2, skipCleanup=false, 
ignoreCleanupFailures=false}
at scala.Predef$.require(Predef.scala:224)
at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.newTaskTempFile(HadoopMapReduceCommitProtocol.scala:78)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:291)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:305)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:249)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1365)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:252)
... 8 more
{code}

> HadoopMapReduceCommitProtocol to fail with message if 
> FileOutputCommitter.getWorkPath==null
> ---
>
> Key: SPARK-20886
> URL: https://issues.apache.org/jira/browse/SPARK-20886
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Steve Loughran
>Priority: Trivial
>
> This is minor, and the root cause is my fault *elsewhere*, but its the patch 
> I used to track down the problem.
> If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for 
> committing things, and *somehow* that's been configured with a 
> {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs.
> A {{require()}} statement can validate the working path and so point the 
> blame at whoever's code is confused.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null

2017-05-25 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024885#comment-16024885
 ] 

Steve Loughran commented on SPARK-20886:


Stack trace: before
{code}
Driver stacktrace:
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  ...
  Cause: org.apache.spark.SparkException: Task failed while writing rows
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:263)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:182)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:181)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
  ...
  Cause: java.lang.NullPointerException:
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.newTaskTempFile(HadoopMapReduceCommitProtocol.scala:76)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:291)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:305)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:249)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
  at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1365)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:252)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:182)
  at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:181)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
{code}

> HadoopMapReduceCommitProtocol to fail with message if 
> FileOutputCommitter.getWorkPath==null
> ---
>
> Key: SPARK-20886
> URL: https://issues.apache.org/jira/browse/SPARK-20886
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Steve Loughran
>Priority: Trivial
>
> This is minor, and the root cause is my fault *elsewhere*, but its the patch 
> I used to track down the problem.
> If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for 
> committing things, and *somehow* that's been configured with a 
> {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs.
> A {{require()}} statement can validate the working path and so point the 
> blame at whoever's code is confused.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null

2017-05-25 Thread Steve Loughran (JIRA)
Steve Loughran created SPARK-20886:
--

 Summary: HadoopMapReduceCommitProtocol to fail with message if 
FileOutputCommitter.getWorkPath==null
 Key: SPARK-20886
 URL: https://issues.apache.org/jira/browse/SPARK-20886
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Steve Loughran
Priority: Trivial


This is minor, and the root cause is my fault *elsewhere*, but its the patch I 
used to track down the problem.

If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for 
committing things, and *somehow* that's been configured with a 
{{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs.

A {{require()}} statement can validate the working path and so point the blame 
at whoever's code is confused.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20799) Unable to infer schema for ORC on S3N when secrets are in the URL

2017-05-25 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-20799:
---
Priority: Minor  (was: Major)

> Unable to infer schema for ORC on S3N when secrets are in the URL
> -
>
> Key: SPARK-20799
> URL: https://issues.apache.org/jira/browse/SPARK-20799
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Jork Zijlstra
>Priority: Minor
>
> We are getting the following exception: 
> {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. 
> It must be specified manually.{code}
> Combining following factors will cause it:
> - Use S3
> - Use format ORC
> - Don't apply a partitioning on de data
> - Embed AWS credentials in the path
> The problem is in the PartitioningAwareFileIndex def allFiles()
> {code}
> leafDirToChildrenFiles.get(qualifiedPath)
>   .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
>   .getOrElse(Array.empty)
> {code}
> leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the 
> qualifiedPath contains the path WITH credentials.
> So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no 
> data is read and the schema cannot be defined.
> Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login 
> details. This is insecure and may be unsupported in future., but this should 
> not mean that it shouldn't work anymore.
> Workaround:
> Move the AWS credentials from the path to the SparkSession
> {code}
> SparkSession.builder
>   .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId})
>   .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey})
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20799) Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL

2017-06-02 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-20799:
---
Summary: Unable to infer schema for ORC/Parquet on S3N when secrets are in 
the URL  (was: Unable to infer schema for ORC on S3N when secrets are in the 
URL)

> Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL
> -
>
> Key: SPARK-20799
> URL: https://issues.apache.org/jira/browse/SPARK-20799
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Jork Zijlstra
>Priority: Minor
>
> We are getting the following exception: 
> {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. 
> It must be specified manually.{code}
> Combining following factors will cause it:
> - Use S3
> - Use format ORC
> - Don't apply a partitioning on de data
> - Embed AWS credentials in the path
> The problem is in the PartitioningAwareFileIndex def allFiles()
> {code}
> leafDirToChildrenFiles.get(qualifiedPath)
>   .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
>   .getOrElse(Array.empty)
> {code}
> leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the 
> qualifiedPath contains the path WITH credentials.
> So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no 
> data is read and the schema cannot be defined.
> Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login 
> details. This is insecure and may be unsupported in future., but this should 
> not mean that it shouldn't work anymore.
> Workaround:
> Move the AWS credentials from the path to the SparkSession
> {code}
> SparkSession.builder
>   .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId})
>   .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey})
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20799) Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL

2017-06-02 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-20799:
---
Environment: Hadoop 2.8.0 binaries

> Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL
> -
>
> Key: SPARK-20799
> URL: https://issues.apache.org/jira/browse/SPARK-20799
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
> Environment: Hadoop 2.8.0 binaries
>Reporter: Jork Zijlstra
>Priority: Minor
>
> We are getting the following exception: 
> {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. 
> It must be specified manually.{code}
> Combining following factors will cause it:
> - Use S3
> - Use format ORC
> - Don't apply a partitioning on de data
> - Embed AWS credentials in the path
> The problem is in the PartitioningAwareFileIndex def allFiles()
> {code}
> leafDirToChildrenFiles.get(qualifiedPath)
>   .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
>   .getOrElse(Array.empty)
> {code}
> leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the 
> qualifiedPath contains the path WITH credentials.
> So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no 
> data is read and the schema cannot be defined.
> Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login 
> details. This is insecure and may be unsupported in future., but this should 
> not mean that it shouldn't work anymore.
> Workaround:
> Move the AWS credentials from the path to the SparkSession
> {code}
> SparkSession.builder
>   .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId})
>   .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey})
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21077) Cannot access public files over S3 protocol

2017-06-16 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051680#comment-16051680
 ] 

Steve Loughran commented on SPARK-21077:


like people say, this is inevitably a config problem. Hadoop 2.7.x has the 
credential provider you need


you should be able to read {{s3a://landsat-pds/scene_list.gz}} as a csv file 
with anon credentials. 

> Cannot access public files over S3 protocol
> ---
>
> Key: SPARK-21077
> URL: https://issues.apache.org/jira/browse/SPARK-21077
> Project: Spark
>  Issue Type: Bug
>  Components: EC2
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0 default installation. No existing hadoop, 
> using the one distributed with Spark.
> Added in $SPARK_HOME/jars:  
> hadoop-aws-2.7.3.jar and aws-java-sdk-1.7.4.jar
> Added endpoint configuration in $SPARK_HOME/conf/core-site.xml (I want to 
> access datasets hosted by organisation with CEPH; follows S3 protocols).
> Ubuntu 14.04 x64.
>Reporter: Ciprian Tomoiaga
>
> I am trying to access a dataset with public (anonymous) credentials via the 
> S3 (or S3a, s3n) protocol. 
> It fails with the error that no provider in chain can supply the credentials.
> I asked our sysadmin to add some dummy credentials, and if I set them up (via 
> link or config) then I have access.
> I tried setting the config :
> {code:xml}
> 
>   fs.s3a.credentials.provider
>   org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider
> 
> {code}
> but it still doesn't work.
> I suggested that it is a java-aws issue 
> [here|https://github.com/aws/aws-sdk-java/issues/1122#issuecomment-307814540],
>  but they said it is not.
> Any hints on how to use public S3 files from Spark ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7481) Add spark-hadoop-cloud module to pull in object store support

2017-05-08 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000502#comment-16000502
 ] 

Steve Loughran commented on SPARK-7481:
---

thank you!

> Add spark-hadoop-cloud module to pull in object store support
> -
>
> Key: SPARK-7481
> URL: https://issues.apache.org/jira/browse/SPARK-7481
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.1.0
>Reporter: Steve Loughran
>Assignee: Steve Loughran
> Fix For: 2.3.0
>
>
> To keep the s3n classpath right, to add s3a, swift & azure, the dependencies 
> of spark in a 2.6+ profile need to add the relevant object store packages 
> (hadoop-aws, hadoop-openstack, hadoop-azure)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20608) Standby namenodes should be allowed to included in yarn.spark.access.namenodes to support HDFS HA

2017-05-05 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998150#comment-15998150
 ] 

Steve Loughran commented on SPARK-20608:


Probably good to pull in someone who understands HDFS HA; I nominate 
[~liuml07]. 

My main worry is that RemoteException could be a symptom of something more 
serious than the node being in standby, but I don't know enough about NN HA for 
my opinions to be trusted.



> Standby namenodes should be allowed to included in 
> yarn.spark.access.namenodes to support HDFS HA
> -
>
> Key: SPARK-20608
> URL: https://issues.apache.org/jira/browse/SPARK-20608
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Submit, YARN
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Yuechen Chen
>Priority: Minor
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> If one Spark Application need to access remote namenodes, 
> yarn.spark.access.namenodes should be only be configged in spark-submit 
> scripts, and Spark Client(On Yarn) would fetch HDFS credential periodically.
> If one hadoop cluster is configured by HA, there would be one active namenode 
> and at least one standby namenode. 
> However, if yarn.spark.access.namenodes includes both active and standby 
> namenodes, Spark Application will be failed for the reason that the standby 
> namenode would not access by Spark for org.apache.hadoop.ipc.StandbyException.
> I think it won't cause any bad effect to config standby namenodes in 
> yarn.spark.access.namenodes, and my Spark Application can be able to sustain 
> the failover of Hadoop namenode.
> HA Examples:
> Spark-submit script: 
> yarn.spark.access.namenodes=hdfs://namenode01,hdfs://namenode02
> Spark Application Codes:
> dataframe.write.parquet(getActiveNameNode(...) + hdfsPath)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20560) Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations

2017-05-02 Thread Steve Loughran (JIRA)
Steve Loughran created SPARK-20560:
--

 Summary: Review Spark's handling of filesystems returning 
"localhost" in getFileBlockLocations
 Key: SPARK-20560
 URL: https://issues.apache.org/jira/browse/SPARK-20560
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.1.0
Reporter: Steve Loughran
Priority: Minor


Some filesystems (S3a, Azure WASB) return "localhost" as the response to 
{{FileSystem.getFileBlockLocations(path)}}. If this is then used as the 
preferred host when scheduling work, there's a risk that work will be queued on 
one host, rather than spread across the cluster.

HIVE-14060 and TEZ-3291 have both seen it in their schedulers.

I don't know if Spark does it, someone needs to look at the code, maybe write 
some tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20560) Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations

2017-05-02 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993008#comment-15993008
 ] 

Steve Loughran commented on SPARK-20560:


{{FileSystem.getFileBlockLocations(path)}} is only invoked from from 
{{HdfsUtils.getFileSegmentLocations}}, and used as a source of data for 
{{RDD.preferredLocations}}

I don't see anything explicit through the code that detects & reacts to the FS 
call returning localhost; I'll do some test downstream to see what surfaces 
against S3. Unless the scheduler has some explicit "localhost -> anywhere" map, 
it might make sense for HdfsUtils.getFileSegmentLocation to downgrade 
"localhost" to None, on the basis that in a cluster FS, the data clearly 
doesn't know where it is.



> Review Spark's handling of filesystems returning "localhost" in 
> getFileBlockLocations
> -
>
> Key: SPARK-20560
> URL: https://issues.apache.org/jira/browse/SPARK-20560
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Steve Loughran
>Priority: Minor
>
> Some filesystems (S3a, Azure WASB) return "localhost" as the response to 
> {{FileSystem.getFileBlockLocations(path)}}. If this is then used as the 
> preferred host when scheduling work, there's a risk that work will be queued 
> on one host, rather than spread across the cluster.
> HIVE-14060 and TEZ-3291 have both seen it in their schedulers.
> I don't know if Spark does it, someone needs to look at the code, maybe write 
> some tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19582) DataFrameReader conceptually inadequate

2017-05-02 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992985#comment-15992985
 ] 

Steve Loughran commented on SPARK-19582:


All spark is doing is taking a URL To data, mapping that to an FS 
implementation classname and expecting that to implement the methods in 
`org.apache.hadoop.FileSystem` so as to provide FS-like behaviour.

Giving minio is nominally an S3 clone, sounds like there's a problem here 
setting up the hadoop S3a client to bind to it. I'd isolate that to the Hadoop 
code before going near Spark, test on Hadoop 2.8 & file bugs against Hadoop 
and/or minio if there are problems. AFAIK, nobody has run the Hadoop S3A 
[tests|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md]
 against minio; doing that and documenting how to configure the client would be 
a welcome contribution. If minio is 100% S3 compatible (c3/v4 auth + multipart 
PUT; encryption optional), then the S3A client should work with it...it could 
work as another integration test for minio.

> DataFrameReader conceptually inadequate
> ---
>
> Key: SPARK-19582
> URL: https://issues.apache.org/jira/browse/SPARK-19582
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.1.0
>Reporter: James Q. Arnold
>
> DataFrameReader assumes it "understands" all data sources (local file system, 
> object stores, jdbc, ...).  This seems limiting in the long term, imposing 
> both development costs to accept new sources and dependency issues for 
> existing sources (how to coordinate the XX jar for internal use vs. the XX 
> jar used by the application).  Unless I have missed how this can be done 
> currently, an application with an unsupported data source cannot create the 
> required RDD for distribution.
> I recommend at least providing a text API for supplying data.  Let the 
> application provide data as a String (or char[] or ...)---not a path, but the 
> actual data.  Alternatively, provide interfaces or abstract classes the 
> application could provide to let the application handle external data 
> sources, without forcing all that complication into the Spark implementation.
> I don't have any code to submit, but JIRA seemed like to most appropriate 
> place to raise the issue.
> Finally, if I have overlooked how this can be done with the current API, a 
> new example would be appreciated.
> Additional detail...
> We use the minio object store, which provides an API compatible with AWS-S3.  
> A few configuration/parameter values differ for minio, but one can use the 
> AWS library in the application to connect to the minio server.
> When trying to use minio objects through spark, the s3://xxx paths are 
> intercepted by spark and handed to hadoop.  So far, I have been unable to 
> find the right combination of configuration values and parameters to 
> "convince" hadoop to forward the right information to work with minio.  If I 
> could read the minio object in the application, and then hand the object 
> contents directly to spark, I could bypass hadoop and solve the problem.  
> Unfortunately, the underlying spark design prevents that.  So, I see two 
> problems.
> -  Spark seems to have taken on the responsibility of "knowing" the API 
> details of all data sources.  This seems iffy in the long run (and is the 
> root of my current problem).  In the long run, it seems unwise to assume that 
> spark should understand all possible path names, protocols, etc.  Moreover, 
> passing S3 paths to hadoop seems a little odd (why not go directly to AWS, 
> for example).  This particular confusion about S3 shows the difficulties that 
> are bound to occur.
> -  Second, spark appears not to have a way to bypass the path name 
> interpretation.  At the least, spark could provide a text/blob interface, 
> letting the application supply the data object and avoid path interpretation 
> inside spark.  Alternatively, spark could accept a reader/stream/... to build 
> the object, again letting the application provide the implementation of the 
> object input.
> As I mentioned above, I might be missing something in the API that lets us 
> work around the problem.  I'll keep looking, but the API as apparently 
> structured seems too limiting.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20560) Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations

2017-05-02 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran resolved SPARK-20560.

Resolution: Invalid

"localhost" is filtered, been done in {{HadoopRDD.getPreferredLocations()}} 
since commit #06aac8a.



> Review Spark's handling of filesystems returning "localhost" in 
> getFileBlockLocations
> -
>
> Key: SPARK-20560
> URL: https://issues.apache.org/jira/browse/SPARK-20560
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Steve Loughran
>Priority: Minor
>
> Some filesystems (S3a, Azure WASB) return "localhost" as the response to 
> {{FileSystem.getFileBlockLocations(path)}}. If this is then used as the 
> preferred host when scheduling work, there's a risk that work will be queued 
> on one host, rather than spread across the cluster.
> HIVE-14060 and TEZ-3291 have both seen it in their schedulers.
> I don't know if Spark does it, someone needs to look at the code, maybe write 
> some tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21137) Spark reads many small files slowly off local filesystem

2017-06-28 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-21137:
---
Summary: Spark reads many small files slowly off local filesystem  (was: 
Spark reads many small files slowly)

> Spark reads many small files slowly off local filesystem
> 
>
> Key: SPARK-21137
> URL: https://issues.apache.org/jira/browse/SPARK-21137
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: sam
>Priority: Minor
>
> A very common use case in big data is to read a large number of small files.  
> For example the Enron email dataset has 1,227,645 small files.
> When one tries to read this data using Spark one will hit many issues.  
> Firstly, even if the data is small (each file only say 1K) any job can take a 
> very long time (I have a simple job that has been running for 3 hours and has 
> not yet got to the point of starting any tasks, I doubt if it will ever 
> finish).
> It seems all the code in Spark that manages file listing is single threaded 
> and not well optimised.  When I hand crank the code and don't use Spark, my 
> job runs much faster.
> Is it possible that I'm missing some configuration option? It seems kinda 
> surprising to me that Spark cannot read Enron data given that it's such a 
> quintessential example.
> So it takes 1 hour to output a line "1,227,645 input paths to process", it 
> then takes another hour to output the same line. Then it outputs a CSV of all 
> the input paths (so creates a text storm).
> Now it's been stuck on the following:
> {code}
> 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo 
> library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608]
> {code}
> for 2.5 hours.
> So I've provided full reproduce steps here (including code and cluster setup) 
> https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can 
> easily just clone, and follow the README to reproduce exactly!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null

2017-09-19 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171386#comment-16171386
 ] 

Steve Loughran commented on SPARK-20886:


Not, but related. This is handling the situation where the committer is a 
classic FileOutputCommitter (or subclass, like the parquet one), but when you 
ask for a working dir it returns null. SPARK-21549 looks like there's 
hard-coded expectations of a dest dir being set via the (private) config option 
used by FileOutputCommitter, and NPEing if its not there

> HadoopMapReduceCommitProtocol to fail with message if 
> FileOutputCommitter.getWorkPath==null
> ---
>
> Key: SPARK-20886
> URL: https://issues.apache.org/jira/browse/SPARK-20886
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Steve Loughran
>Assignee: Steve Loughran
>Priority: Trivial
> Fix For: 2.3.0
>
>
> This is minor, and the root cause is my fault *elsewhere*, but its the patch 
> I used to track down the problem.
> If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for 
> committing things, and *somehow* that's been configured with a 
> {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs.
> A {{require()}} statement can validate the working path and so point the 
> blame at whoever's code is confused.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-09-19 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171393#comment-16171393
 ] 

Steve Loughran commented on SPARK-21549:


Linking to SPARK-20045, which highlights the commit logic, especially the abort 
code, needs to be resilient to failures, including that of the invoked 
{{OutputCommitter.abort()}} calls from raising exceptions. While people 
implementing committers should be expected to write resilient abort routines, 
you can't rely on it. Same for calling fs.delete()...it could also fall, so 
wrapping everything in an exception handler would at least make abort resilient.

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-09-19 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171420#comment-16171420
 ] 

Steve Loughran commented on SPARK-21549:


# you can't rely on the committers having output and temp dirs. Subclasses of 
{{FileOutputCommitter}} *must*, though there's no official mechanism for 
querying that because {{getOutputPath()}} is private.
# Hadoop 3.0 has added (MAPREDUCE-6956) a new superclass of 
{{FileOutputCommitter}}, 
[PathOutputCommitter|[https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java]
 which pulls up getWorkingDir method to be more general (so that you can have 
output committers which tell spark and hive where their intermediate data 
should go, without them being subclasses of FileOutputCommitter.
# I'm happy to pull up {{getOutputPath}} to that class, and if people can give 
me a patch for it *this week* I'll add it for 3.0 beta 1. 

Regarding the committer here, you might want to think of moving off/subclassing 
{{HadoopMapReduceCommitProtocol}}. This is what I've done in 
[PathOutputCommitProtoco|https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/com/hortonworks/spark/cloud/PathOutputCommitProtocol.scala],
 though I can see it's still relying on the superclass to get that properly. 
Again, if we can patch the new PathOutputCommitter class for a getOutputPath I 
use that here. And yes, while that new mapreduce will take a long time to 
surface in spark core, you can use it independently, from later this year..

If you are playing with different committers out of spark's own codebase, pick 
up the the ORC hive tests from 
[https://github.com/hortonworks-spark/cloud-integration/tree/master/cloud-examples/src/test/scala/org/apache/spark/sql/sources].
 These are just some of the spark sql tests reworked slightly so that they'll 
work with any FileSystem impl. rather than just local file:// paths

Ping me direct if you are playing with new committers, & look at MAPREDUCE-6823 
to see if that'd be useful to you (& how it could be improved, given its still 
not in the codebase)



> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> 

[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-09-20 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173449#comment-16173449
 ] 

Steve Loughran commented on SPARK-21549:


The {{newTaskTempFileAbsPath()}} method is an interesting spot of code...I'm 
still trying to work out when it is actually used. Some committers like 
{{ManifestFileCommitProtocol}} don't support it all. 

However, if it is used, then your patch is going to cause problems if the dest 
FS != the default FS, because then the bit of the protocol which takes that 
list of temp files and renames() them into their destination is going to fail. 
I think you'd be better off having the committer fail fast when an absolute 
path is asked for

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17159) Improve FileInputDStream.findNewFiles list performance

2017-09-20 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran resolved SPARK-17159.

Resolution: Won't Fix

Based on the feedback of https://github.com/apache/spark/pull/14731 ; I'm doing 
all my spark+cloud support elsewhere. 

closing as a WONTFIX; 

> Improve FileInputDStream.findNewFiles list performance
> --
>
> Key: SPARK-17159
> URL: https://issues.apache.org/jira/browse/SPARK-17159
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.0
> Environment: spark against object stores
>Reporter: Steve Loughran
>Priority: Minor
>
> {{FileInputDStream.findNewFiles()}} is doing a globStatus with a fitler that 
> calls getFileStatus() on every file, takes the output and does listStatus() 
> on the output.
> This going to suffer on object stores, as dir listing and getFileStatus calls 
> are so expensive. It's clear this is a problem, as the method has code to 
> detect timeouts in the window and warn of problems.
> It should be possible to make this faster



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22163) Design Issue of Spark Streaming that Causes Random Run-time Exception

2017-10-05 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-22163:
---
Priority: Major  (was: Critical)

> Design Issue of Spark Streaming that Causes Random Run-time Exception
> -
>
> Key: SPARK-22163
> URL: https://issues.apache.org/jira/browse/SPARK-22163
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark Streaming
> Kafka
> Linux
>Reporter: Michael N
>
> The application objects can contain List and can be modified dynamically as 
> well.   However, Spark Streaming framework asynchronously serializes the 
> application's objects as the application runs.  Therefore, it causes random 
> run-time exception on the List when Spark Streaming framework happens to 
> serializes the application's objects while the application modifies a List in 
> its own object.  
> In fact, there are multiple bugs reported about
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject
> that are permutation of the same root cause. So the design issue of Spark 
> streaming framework is that it should do this serialization asynchronously.  
> Instead, it should either
> 1. do this serialization synchronously. This is preferred to eliminate the 
> issue completely.  Or
> 2. Allow it to be configured per application whether to do this serialization 
> synchronously or asynchronously, depending on the nature of each application.
> Also, Spark documentation should describe the conditions that trigger Spark 
> to do this type of serialization asynchronously, so the applications can work 
> around them until the fix is provided. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22217) ParquetFileFormat to support arbitrary OutputCommitters

2017-10-06 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-22217:
---
Priority: Minor  (was: Major)

> ParquetFileFormat to support arbitrary OutputCommitters
> ---
>
> Key: SPARK-22217
> URL: https://issues.apache.org/jira/browse/SPARK-22217
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Steve Loughran
>Priority: Minor
>
> Although you can choose which committer to write dataframes as parquet data 
> via {{spark.sql.parquet.output.committer.class}}, you get a class cast 
> exception if this is not a 
> {{org.apache.parquet.hadoop.ParquetOutputCommitter}} or subclass.
> This is not consistent with the docs in SQLConf, which says
> bq.  The specified class needs to be a subclass of 
> org.apache.hadoop.mapreduce.OutputCommitter.  Typically, it's also a subclass 
> of org.apache.parquet.hadoop.ParquetOutputCommitter.
> It is simple to relax {{ParquetFileFormat}}'s requirements, though if the 
> user has set
> {{parquet.enable.summary-metadata=true}}, and set a committer which is not a 
> ParquetOutputCommitter, then they won't see the data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22217) ParquetFileFormat to support arbitrary OutputCommitters if parquet.enable.summary-metadata is false

2017-10-06 Thread Steve Loughran (JIRA)
Steve Loughran created SPARK-22217:
--

 Summary: ParquetFileFormat to support arbitrary OutputCommitters 
if parquet.enable.summary-metadata is false
 Key: SPARK-22217
 URL: https://issues.apache.org/jira/browse/SPARK-22217
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Steve Loughran


Although you can choose which committer to write dataframes as parquet data via 
{{spark.sql.parquet.output.committer.class}}, you get a class cast exception if 
this is not a {{org.apache.parquet.hadoop.ParquetOutputCommitter}} or subclass.

This is not consistent with the docs in SQLConf, which says

bq.  The specified class needs to be a subclass of 
org.apache.hadoop.mapreduce.OutputCommitter.  Typically, it's also a subclass 
of org.apache.parquet.hadoop.ParquetOutputCommitter.

It is simple to relax {{ParquetFileFormat}}'s requirements, though if the user 
has set
{{parquet.enable.summary-metadata=true}}, and set a committer which is not a 
ParquetOutputCommitter, then they won't see the data.






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22217) ParquetFileFormat to support arbitrary OutputCommitters

2017-10-06 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-22217:
---
Summary: ParquetFileFormat to support arbitrary OutputCommitters  (was: 
ParquetFileFormat to support arbitrary OutputCommitters if 
parquet.enable.summary-metadata is false)

> ParquetFileFormat to support arbitrary OutputCommitters
> ---
>
> Key: SPARK-22217
> URL: https://issues.apache.org/jira/browse/SPARK-22217
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Steve Loughran
>
> Although you can choose which committer to write dataframes as parquet data 
> via {{spark.sql.parquet.output.committer.class}}, you get a class cast 
> exception if this is not a 
> {{org.apache.parquet.hadoop.ParquetOutputCommitter}} or subclass.
> This is not consistent with the docs in SQLConf, which says
> bq.  The specified class needs to be a subclass of 
> org.apache.hadoop.mapreduce.OutputCommitter.  Typically, it's also a subclass 
> of org.apache.parquet.hadoop.ParquetOutputCommitter.
> It is simple to relax {{ParquetFileFormat}}'s requirements, though if the 
> user has set
> {{parquet.enable.summary-metadata=true}}, and set a committer which is not a 
> ParquetOutputCommitter, then they won't see the data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed

2017-10-13 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203495#comment-16203495
 ] 

Steve Loughran commented on SPARK-22240:


We've got a test in HADOOP-14943 which looks @ part sizing, confirms you only 
get one part back from s3a right now.

> S3 CSV number of partitions incorrectly computed
> 
>
> Key: SPARK-22240
> URL: https://issues.apache.org/jira/browse/SPARK-22240
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0
>Reporter: Arthur Baudry
>
> Reading CSV out of S3 using S3A protocol does not compute the number of 
> partitions correctly in Spark 2.2.0.
> With Spark 2.2.0 I get only partition when loading a 14GB file
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 1
> {code}
> While in Spark 2.0.2 I had:
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 115
> {code}
> This introduces obvious performance issues in Spark 2.2.0. Maybe there is a 
> property that should be set to have the number of partitions computed 
> correctly.
> I'm aware that the .option("multiline","true") is not supported in Spark 
> 2.0.2, it's not relevant here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21999) ConcurrentModificationException - Spark Streaming

2017-10-05 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192732#comment-16192732
 ] 

Steve Loughran edited comment on SPARK-21999 at 10/5/17 1:39 PM:
-

Apache projects are all open source, with an open community. The ASF welcomes 
contributions, and does request that people follow an etiquette of constructive 
discourse: https://community.apache.org/contributors/etiquette

Personally, I've always found Sean to be helpful and polite, even when I've 
turned out to be utterly wrong. I'd advise listening to his feedback, and not 
view it as a personal criticism. Whatever problem you have, faulting him isn't 
going to fix it.

Like I said, projects welcomes contributors and their contributions. 

# One key area where people can get their hands dirty in a project is 
documentation. Do you think some content in the spark streaming guide could be 
improved? Why not add that section & submit a patch for review?
# Because you'll need the latest source tree to hand to do it, before you even 
worry about documenting a problem, why not see if it "has gone away", or at 
least "moved somewhere else"? Working with the latest versions of the code may 
seem leading edge, but its the best way to get your problems fixed, and the 
only way to pick up an immediate patch.
# For any issue in a project, it doesn't get fixed ASAP, at least not in a form 
you can pick up and use unless you have that build environment yourself. If you 
are using a software product built on spark, well, you'll need to wait for that 
supplier to update their release, which happens on their release schedule. For 
the ASF releases, there's a release process and timetable.
# Which means: for now, you have to come up with a solution to your problem. 
Given it's related to structure serialization, well, you can implement your own 
{{readObject}} and {{writeObject}} methods, and so perhaps wrap the structures 
being serialized with something which implements a broader locking structure 
across your data. It's what I'd try first, as it would be an interesting little 
problem: A datastructure which can have mutating operations invoked while 
serialization is in progress. Testing this would be fun...you'd need some kind 
of mock OutputStream which could block on a write() so you could guarantee the 
writer thread was blocking at the correct place for the mutation call to 
trigger the failure.

As to whether it's architectural? That's something which could be debated. 
Checkpointing the state of streaming applications is one of those challenging 
problems, especially if the notion of "where you are" across multiple input 
streams is hard, and, if the cost of preserving state is high, any attempt to 
block for the operation will hurt throughput. And changing checkpoint 
architectures is so fundamental that its not some one-line patch. 
If you can fix your structures up to be robustly serializable during 
asynchronous writes, you'll get performance as well as robustness.




was (Author: ste...@apache.org):
Apache projects are all open source, with an open community. The ASF welcomes 
contributions, and does request that people follow an etiquette of constructive 
discourse: https://community.apache.org/contributors/etiquette

Personally, I've always found Sean to be helpful and polite, even when I've 
turned out to be utterly wrong. I'd advise listening to his feedback, and not 
view it as a personal criticism. Whatever problem you have, faulting him isn't 
going to fix it.

Like I said, projects welcomes contributors and their contributions. 

# One key area where people can get their hands dirty in a project is 
documentation. Do you think some content in the spark streaming guide could be 
improved? Why not add that section & submit a patch for review?
# Because you'll need the latest source tree to hand to do it, before you even 
worry about documenting a problem, why not see if it "has gone away", or at 
least "moved somewhere else"? Working with the latest versions of the code may 
seem leading edge, but its the best way to get your problems fixed, and the 
only way to pick up an immediate patch.
# For any issue in a project, it doesn't get fixed ASAP, at least not in a form 
you can pick up and use unless you have that build environment yourself. If you 
are using a software product built on spark, well, you'll need to wait for that 
supplier to update their release, which happens on their release schedule. For 
the ASF releases, there's a release process and timetable.
# Which means: for now, you have to come up with a solution to your problem. 
Given it's related to structure serialization, well, you can implement your own 
{{readObject}} and {{writeObject}} methods, and so perhaps wrap the structures 
being serialized with something which implements a broader locking structure 
across your data. It's what I'd try 

[jira] [Commented] (SPARK-21999) ConcurrentModificationException - Spark Streaming

2017-10-05 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192732#comment-16192732
 ] 

Steve Loughran commented on SPARK-21999:


Apache projects are all open source, with an open community. The ASF welcomes 
contributions, and does request that people follow an etiquette of constructive 
discourse: https://community.apache.org/contributors/etiquette

Personally, I've always found Sean to be helpful and polite, even when I've 
turned out to be utterly wrong. I'd advise listening to his feedback, and not 
view it as a personal criticism. Whatever problem you have, faulting him isn't 
going to fix it.

Like I said, projects welcomes contributors and their contributions. 

# One key area where people can get their hands dirty in a project is 
documentation. Do you think some content in the spark streaming guide could be 
improved? Why not add that section & submit a patch for review?
# Because you'll need the latest source tree to hand to do it, before you even 
worry about documenting a problem, why not see if it "has gone away", or at 
least "moved somewhere else"? Working with the latest versions of the code may 
seem leading edge, but its the best way to get your problems fixed, and the 
only way to pick up an immediate patch.
# For any issue in a project, it doesn't get fixed ASAP, at least not in a form 
you can pick up and use unless you have that build environment yourself. If you 
are using a software product built on spark, well, you'll need to wait for that 
supplier to update their release, which happens on their release schedule. For 
the ASF releases, there's a release process and timetable.
# Which means: for now, you have to come up with a solution to your problem. 
Given it's related to structure serialization, well, you can implement your own 
{{readObject}} and {{writeObject}} methods, and so perhaps wrap the structures 
being serialized with something which implements a broader locking structure 
across your data. It's what I'd try first, as it would be an interesting little 
problem: A datastructure which can have mutating operations invoked while 
serialization is in progress. Testing this would be fun...you'd need some kind 
of mock OutputStream which could block on a write() so you could guarantee the 
writer thread was blocking at the correct place for the mutation call to 
trigger the failure.

As to whether it's architectural, well, that's something which could be 
debated. Checkpointing the state of streaming applications is one of those 
challenging problems, especially if the notion of "where you are" across 
multiple input streams is hard, and, if the cost of preserving state is high, 
any attempt to block for the operation will hurt throughput. And changing 
checkpoint architectures is so fundamental that its not some one-line patch. 
If you can fix your structures up to be robustly serializable during 
asynchronous writes, you'll get performance as well as robustness.



> ConcurrentModificationException - Spark Streaming
> -
>
> Key: SPARK-21999
> URL: https://issues.apache.org/jira/browse/SPARK-21999
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Michael N
>Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting 
> ConcurrentModificationException intermittently.  When it occurs, Spark does 
> not honor the specified value of spark.task.maxFailures. So Spark aborts the 
> current batch  and fetch the next batch, so it results in lost data. Its 
> exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at 
> https://issues.apache.org/jira/browse/SPARK-17463, which was about 
> Serialization of accumulators in heartbeats.  However, my Spark stream app 
> does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming 
> driver at the time of data loss.   
> From the line of code in the first stack trace, can you tell which object 
> Spark was trying to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have 
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> 
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 150522493 ms
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at 

[jira] [Commented] (SPARK-21999) ConcurrentModificationException - Spark Streaming

2017-10-06 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16194332#comment-16194332
 ] 

Steve Loughran commented on SPARK-21999:


Telling a project "their design is wrong" and expecting a co-operative response 
isn't going to work, and if something is a core part of the architecture, it's 
not going to change. That's why he's closing it. It'd be like going to 
linux-kernel dev and demanding that they switched to a microkernel 
architecture, or emacs-dev and pointing out that their key encodings make no 
sense on modern keyboards. All WONTFIX complaints where you aren't going get 
any satisfaction in raising -so why bother.

bq. About your other points, I already modified my code to get around this 
issue. 

good to hear. In my many years as a software developer, I've come to realise 
that software development is about working round the implementation and 
architectural decisions which my predecessors made, and which, in modern times, 
don't seem relevant. All while trying not to do the same things yourself. It's 
a losing battle, but being able to work around problems is the fundamental 
skill that seems to hold

bq. 1. In the first place, why does Spark serialize the application objects 
asynchronously while the streaming application is running continuously from 
batch to batch ?

Don't know. to find out, I'd find the  lines where it takes place, select it in 
my IDE, hit "show history for selection" & work back from the pull requests 

bq. 2. If Spark needs to do this type of serialization at all, why does it not 
do at the end of the batch synchronously ?

It's how it checkpoints the state of a streaming application. That's a 
fundamental need. 

bq. But Sean did not provide the answers and instead just kept closing that 
ticket. If he does not know the answers or information for tickets, he should 
let someone else who has such information answers them.

The source is there. And along with the source comes the SCM history, which 
provides the rationale for most decisions.

Now, to close this, and to stop Sean taking all the blame, I'll be closing this 
as a WONTFIX. Please, only re-open if you have something to contribute, in 
particular, as I mentioned, documentation. Grab the latest source, improve the 
streaming docs, follow the spark contribution process & submit a github pull 
request, see how it goes. 




> ConcurrentModificationException - Spark Streaming
> -
>
> Key: SPARK-21999
> URL: https://issues.apache.org/jira/browse/SPARK-21999
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Michael N
>Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting 
> ConcurrentModificationException intermittently.  When it occurs, Spark does 
> not honor the specified value of spark.task.maxFailures. So Spark aborts the 
> current batch  and fetch the next batch, so it results in lost data. Its 
> exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at 
> https://issues.apache.org/jira/browse/SPARK-17463, which was about 
> Serialization of accumulators in heartbeats.  However, my Spark stream app 
> does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming 
> driver at the time of data loss.   
> From the line of code in the first stack trace, can you tell which object 
> Spark was trying to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have 
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> 
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 150522493 ms
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>   at 
> 

[jira] [Resolved] (SPARK-21999) ConcurrentModificationException - Spark Streaming

2017-10-06 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran resolved SPARK-21999.

Resolution: Won't Fix

> ConcurrentModificationException - Spark Streaming
> -
>
> Key: SPARK-21999
> URL: https://issues.apache.org/jira/browse/SPARK-21999
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Michael N
>Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting 
> ConcurrentModificationException intermittently.  When it occurs, Spark does 
> not honor the specified value of spark.task.maxFailures. So Spark aborts the 
> current batch  and fetch the next batch, so it results in lost data. Its 
> exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at 
> https://issues.apache.org/jira/browse/SPARK-17463, which was about 
> Serialization of accumulators in heartbeats.  However, my Spark stream app 
> does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming 
> driver at the time of data loss.   
> From the line of code in the first stack trace, can you tell which object 
> Spark was trying to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have 
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> 
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 150522493 ms
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 

[jira] [Commented] (SPARK-2984) FileNotFoundException on _temporary directory

2017-10-17 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207573#comment-16207573
 ] 

Steve Loughran commented on SPARK-2984:
---

bq. multiple batches writing to same location simultaneously


Hadoop {{FileOutputCommitter}} cleans up $dest/_temporary while committing or 
aborting a job.

if you are writing >1 job to the same directory tree simultaneously, expect the 
job cleanup in one task to break the others. You could try overloading 
ParquetOutputCommitter.cleanupJob() to stop this, but it's probably safer to 
work out why output to the same path is happening in parallel and stop it.

> FileNotFoundException on _temporary directory
> -
>
> Key: SPARK-2984
> URL: https://issues.apache.org/jira/browse/SPARK-2984
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Andrew Ash
>Assignee: Josh Rosen
>Priority: Critical
> Fix For: 1.3.0
>
>
> We've seen several stacktraces and threads on the user mailing list where 
> people are having issues with a {{FileNotFoundException}} stemming from an 
> HDFS path containing {{_temporary}}.
> I ([~aash]) think this may be related to {{spark.speculation}}.  I think the 
> error condition might manifest in this circumstance:
> 1) task T starts on a executor E1
> 2) it takes a long time, so task T' is started on another executor E2
> 3) T finishes in E1 so moves its data from {{_temporary}} to the final 
> destination and deletes the {{_temporary}} directory during cleanup
> 4) T' finishes in E2 and attempts to move its data from {{_temporary}}, but 
> those files no longer exist!  exception
> Some samples:
> {noformat}
> 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job 
> 140774430 ms.0
> java.io.FileNotFoundException: File 
> hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
>  does not exist.
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
> at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
> at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
> at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
> at 
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
> at 
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
> at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841)
> at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
> at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771)
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> -- Chen Song at 
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFiles-file-not-found-exception-td10686.html
> {noformat}
> I am running a Spark Streaming job that uses saveAsTextFiles to save results 
> into hdfs files. However, it has an exception after 20 batches
> result-140631234/_temporary/0/task_201407251119__m_03 does not 
> exist.
> {noformat}
> and
> {noformat}
> 

[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed

2017-10-13 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203709#comment-16203709
 ] 

Steve Loughran commented on SPARK-22240:


[~hyukjin.kwon]: we now see that on s3a, you only ever get a partition size 
from the FS of 1, which means when spark asks the FS "how many blocks is this 
file in", the answer is 1. That's info which can be used for determining what 
gives best IO performance for multiple executors. In HDFS, for a file in N 
blocks & R replicas, you get N * R actual blocks you can read in parallel. For 
S3 it may look like there is just 1 block, but if you upload in multiple parts 
(as happens when the write gets above some threshold like 64M, then you do get 
parallelism on the read. There's no way to determine what the block size was on 
the upload (maybe we could save it as a header in future), but we can get the 
FS to tell the query engine what the current blocksize is & so be consistent 
across writes and reads.

Spark does its own partitioning too, with the default min size & other things 
controlling the decision, e.g. what the user asks for, #of executors, whether 
the format says it can be split (anything .gz == false, etc). So it may just be 
the multiline case. 

the local filesystem only returns  one element for getFileBlockLocations(path, 
...), so you can just check locally if the problem exists when multiline = true 
vs false. partitions 


> S3 CSV number of partitions incorrectly computed
> 
>
> Key: SPARK-22240
> URL: https://issues.apache.org/jira/browse/SPARK-22240
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0
>Reporter: Arthur Baudry
>
> Reading CSV out of S3 using S3A protocol does not compute the number of 
> partitions correctly in Spark 2.2.0.
> With Spark 2.2.0 I get only partition when loading a 14GB file
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 1
> {code}
> While in Spark 2.0.2 I had:
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 115
> {code}
> This introduces obvious performance issues in Spark 2.2.0. Maybe there is a 
> property that should be set to have the number of partitions computed 
> correctly.
> I'm aware that the .option("multiline","true") is not supported in Spark 
> 2.0.2, it's not relevant here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed

2017-10-12 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201863#comment-16201863
 ] 

Steve Loughran commented on SPARK-22240:


thanks. Now for a question which is probably obvious to others:

Is there a straightforward way to determine the number of partitions a file is 
being broken into other than doing a job & counting the # of part- files? I 
guess even if there is, counting the files is straightforward. 

If it's multiline related, do you think this would apply to all filesystems, or 
is it possible that s3a is making things worse?

Anyway, whatever happens here, we'll get a fix for s3a's getBlockLocations in 
shortly

> S3 CSV number of partitions incorrectly computed
> 
>
> Key: SPARK-22240
> URL: https://issues.apache.org/jira/browse/SPARK-22240
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0
>Reporter: Arthur Baudry
>
> Reading CSV out of S3 using S3A protocol does not compute the number of 
> partitions correctly in Spark 2.2.0.
> With Spark 2.2.0 I get only partition when loading a 14GB file
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 1
> {code}
> While in Spark 2.0.2 I had:
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 115
> {code}
> This introduces obvious performance issues in Spark 2.2.0. Maybe there is a 
> property that should be set to have the number of partitions computed 
> correctly.
> I'm aware that the .option("multiline","true") is not supported in Spark 
> 2.0.2, it's not relevant here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-10-12 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201917#comment-16201917
 ] 

Steve Loughran commented on SPARK-21797:


Update, in HADOOP-14874 I've noted we could use the existing 
{{FileSystem.getContentSummary(Path)}} API to return {{StorageType.ARCHIVE}} 
for glaciated data. You'd need a way of filtering the listing of source files 
to strip out everything of archive type, but then yes, you could skip data in 
glacier

> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Amazon EMR
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed

2017-10-13 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203966#comment-16203966
 ] 

Steve Loughran commented on SPARK-22240:


Point me at a simple test suite for the multiline & I'll see what I can do on 
s3a to explore its behaviour

> S3 CSV number of partitions incorrectly computed
> 
>
> Key: SPARK-22240
> URL: https://issues.apache.org/jira/browse/SPARK-22240
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0
>Reporter: Arthur Baudry
>
> Reading CSV out of S3 using S3A protocol does not compute the number of 
> partitions correctly in Spark 2.2.0.
> With Spark 2.2.0 I get only partition when loading a 14GB file
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 1
> {code}
> While in Spark 2.0.2 I had:
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 115
> {code}
> This introduces obvious performance issues in Spark 2.2.0. Maybe there is a 
> property that should be set to have the number of partitions computed 
> correctly.
> I'm aware that the .option("multiline","true") is not supported in Spark 
> 2.0.2, it's not relevant here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-08-29 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146193#comment-16146193
 ] 

Steve Loughran commented on SPARK-21797:


No> That's a shame. I only came across the option when I pasted the stack trace 
in the IDE, and it said "enable this option". sorry, I'm not sure about what 
other strategies there are. Sean? Any idea?

> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Amazon EMR
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20448) Document how FileInputDStream works with object storage

2017-09-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178236#comment-16178236
 ] 

Steve Loughran commented on SPARK-20448:


thanks!

> Document how FileInputDStream works with object storage
> ---
>
> Key: SPARK-20448
> URL: https://issues.apache.org/jira/browse/SPARK-20448
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 2.1.0
>Reporter: Steve Loughran
>Assignee: Steve Loughran
>Priority: Minor
> Fix For: 2.3.0
>
>
> Object stores work differently from filesystems: intermediate writes not 
> visible, renames are really O(data) copies, not O(1) transactions.
> This makes working with them as DStreams fundamentally different: you can 
> write straight into the destination.
> 1. Document how FileinputDStreams scan directories for changes
> 2. Document how object stores behave differently, and the implications
> for users.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-2356) Exception: Could not locate executable null\bin\winutils.exe in the Hadoop

2017-09-29 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran resolved SPARK-2356.
---
Resolution: Duplicate

> Exception: Could not locate executable null\bin\winutils.exe in the Hadoop 
> ---
>
> Key: SPARK-2356
> URL: https://issues.apache.org/jira/browse/SPARK-2356
> Project: Spark
>  Issue Type: Bug
>  Components: Windows
>Affects Versions: 1.0.0, 1.1.1, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 1.4.1, 1.5.0, 
> 1.5.1, 1.5.2
>Reporter: Kostiantyn Kudriavtsev
>Priority: Critical
>
> I'm trying to run some transformation on Spark, it works fine on cluster 
> (YARN, linux machines). However, when I'm trying to run it on local machine 
> (Windows 7) under unit test, I got errors (I don't use Hadoop, I'm read file 
> from local filesystem):
> {code}
> 14/07/02 19:59:31 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 14/07/02 19:59:31 ERROR Shell: Failed to locate the winutils binary in the 
> hadoop binary path
> java.io.IOException: Could not locate executable null\bin\winutils.exe in the 
> Hadoop binaries.
>   at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
>   at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
>   at org.apache.hadoop.util.Shell.(Shell.java:326)
>   at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
>   at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
>   at org.apache.hadoop.security.Groups.(Groups.java:77)
>   at 
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
>   at 
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
>   at 
> org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala)
>   at org.apache.spark.SparkContext.(SparkContext.scala:228)
>   at org.apache.spark.SparkContext.(SparkContext.scala:97)
> {code}
> It's happened because Hadoop config is initialized each time when spark 
> context is created regardless is hadoop required or not.
> I propose to add some special flag to indicate if hadoop config is required 
> (or start this configuration manually)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2356) Exception: Could not locate executable null\bin\winutils.exe in the Hadoop

2017-09-29 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185744#comment-16185744
 ] 

Steve Loughran commented on SPARK-2356:
---

[~Vasilina], that probably means you're running with Hadoop <=2.7; the more 
helpful message only went in with HADOOP-10775. Sorry.

I'm about to close this as a duplicate of HADOOP-10775, as really it is a 
config problem (plus the need for the hadoop libs to have a copy of 
winutils.exe around for file operations)...all that can be done short of 
removing that dependency is fixing the error message, which we've done our best 
at.

> Exception: Could not locate executable null\bin\winutils.exe in the Hadoop 
> ---
>
> Key: SPARK-2356
> URL: https://issues.apache.org/jira/browse/SPARK-2356
> Project: Spark
>  Issue Type: Bug
>  Components: Windows
>Affects Versions: 1.0.0, 1.1.1, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 1.4.1, 1.5.0, 
> 1.5.1, 1.5.2
>Reporter: Kostiantyn Kudriavtsev
>Priority: Critical
>
> I'm trying to run some transformation on Spark, it works fine on cluster 
> (YARN, linux machines). However, when I'm trying to run it on local machine 
> (Windows 7) under unit test, I got errors (I don't use Hadoop, I'm read file 
> from local filesystem):
> {code}
> 14/07/02 19:59:31 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 14/07/02 19:59:31 ERROR Shell: Failed to locate the winutils binary in the 
> hadoop binary path
> java.io.IOException: Could not locate executable null\bin\winutils.exe in the 
> Hadoop binaries.
>   at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
>   at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
>   at org.apache.hadoop.util.Shell.(Shell.java:326)
>   at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
>   at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
>   at org.apache.hadoop.security.Groups.(Groups.java:77)
>   at 
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
>   at 
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
>   at 
> org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala)
>   at org.apache.spark.SparkContext.(SparkContext.scala:228)
>   at org.apache.spark.SparkContext.(SparkContext.scala:97)
> {code}
> It's happened because Hadoop config is initialized each time when spark 
> context is created regardless is hadoop required or not.
> I propose to add some special flag to indicate if hadoop config is required 
> (or start this configuration manually)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21817) Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex

2017-08-23 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138683#comment-16138683
 ] 

Steve Loughran commented on SPARK-21817:


I think it's a regression in HDFS-6984; the superclass handles permissions 
being null, but the modified LocatedFileStatus ctor doesn't.

Ewan: do a patch there with a new test method (where?) & I'll review it.

> Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex
> --
>
> Key: SPARK-21817
> URL: https://issues.apache.org/jira/browse/SPARK-21817
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ewan Higgs
>Priority: Minor
> Attachments: SPARK-21817.001.patch
>
>
> The implementation of HDFS-6984 now uses the passed in {{FSPermission}} to 
> pull out the ACL and other information. Therefore passing in a {{null}} is no 
> longer adequate and hence causes a NPE when listing files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21817) Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex

2017-08-23 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138664#comment-16138664
 ] 

Steve Loughran commented on SPARK-21817:


This a regression in HDFS?

> Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex
> --
>
> Key: SPARK-21817
> URL: https://issues.apache.org/jira/browse/SPARK-21817
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ewan Higgs
>Priority: Minor
> Attachments: SPARK-21817.001.patch
>
>
> The implementation of HDFS-6984 now uses the passed in {{FSPermission}} to 
> pull out the ACL and other information. Therefore passing in a {{null}} is no 
> longer adequate and hence causes a NPE when listing files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21817) Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex

2017-08-23 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138685#comment-16138685
 ] 

Steve Loughran commented on SPARK-21817:


API is tagged as stable/evolving; it's clearly in use downstream, so strictly, 
yep, broken, and easily fixed. Just not a codepath tested in hdfs

> Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex
> --
>
> Key: SPARK-21817
> URL: https://issues.apache.org/jira/browse/SPARK-21817
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ewan Higgs
>Priority: Minor
> Attachments: SPARK-21817.001.patch
>
>
> The implementation of HDFS-6984 now uses the passed in {{FSPermission}} to 
> pull out the ACL and other information. Therefore passing in a {{null}} is no 
> longer adequate and hence causes a NPE when listing files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21702) Structured Streaming S3A SSE Encryption Not Visible through AWS S3 GUI when PartitionBy Used

2017-08-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139898#comment-16139898
 ] 

Steve Loughran commented on SPARK-21702:


IF this is just "directories", then there are no directories in s3. We create 
some mock ones for empty dirs (i.e after a mkdirs() call), through 0-byte 
objects. We then delete all such 0-byte objects when you write data underneath 
{{see S3AFilesystem.deleteUnnecessaryFakeDirectories(Path)}}. 

I think that's what's been causing the confusion.

I'm going to close this one as invalid. sorry.

FWIW, if you do want to guarantee data in a bucket is encrypted, set the bucket 
policy to mandate this. It's the best way to be confident that all your data is 
locked down: [[https://hortonworks.github.io/hdp-aws/s3-encryption/index.html]]



> Structured Streaming S3A SSE Encryption Not Visible through AWS S3 GUI when 
> PartitionBy Used
> 
>
> Key: SPARK-21702
> URL: https://issues.apache.org/jira/browse/SPARK-21702
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Hadoop 2.7.3: AWS SDK 1.7.4
> Hadoop 2.8.1: AWS SDK 1.10.6
>Reporter: George Pongracz
>Priority: Minor
>  Labels: security
>
> Settings:
>   .config("spark.hadoop.fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>   .config("spark.hadoop.fs.s3a.server-side-encryption-algorithm", 
> "AES256")
> When writing to an S3 sink from structured streaming the files are being 
> encrypted using AES-256
> When introducing a "PartitionBy" the output data files are unencrypted. 
> All other supporting files, metadata are encrypted
> Suspect write to temp is encrypted and move/rename is not applying the SSE.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21702) Structured Streaming S3A SSE Encryption Not Visible through AWS S3 GUI when PartitionBy Used

2017-08-24 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran resolved SPARK-21702.

Resolution: Invalid

> Structured Streaming S3A SSE Encryption Not Visible through AWS S3 GUI when 
> PartitionBy Used
> 
>
> Key: SPARK-21702
> URL: https://issues.apache.org/jira/browse/SPARK-21702
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Hadoop 2.7.3: AWS SDK 1.7.4
> Hadoop 2.8.1: AWS SDK 1.10.6
>Reporter: George Pongracz
>Priority: Minor
>  Labels: security
>
> Settings:
>   .config("spark.hadoop.fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>   .config("spark.hadoop.fs.s3a.server-side-encryption-algorithm", 
> "AES256")
> When writing to an S3 sink from structured streaming the files are being 
> encrypted using AES-256
> When introducing a "PartitionBy" the output data files are unencrypted. 
> All other supporting files, metadata are encrypted
> Suspect write to temp is encrypted and move/rename is not applying the SSE.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-08-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140138#comment-16140138
 ] 

Steve Loughran commented on SPARK-21797:


I was talking about the cost and time of getting data from Glacier. If that's 
the only place where data lives, then its slow and expensive. And that's the 
bit I'm describing as niche. Given I've been working full time on S3A, I'm 
reasonably confident it gets used a lot.

If you talk to data in S3 that has been backed up to glacier, you *wlll get a 
403*: According to Jeff Barr himself: 
https://aws.amazon.com/blogs/aws/archive-s3-to-glacier/

bq. If you archive objects using the Glacier storage option, you must inspect 
the storage class of an object before you attempt to retrieve it. The customary 
GET request will work as expected if the object is stored in S3 Standard or 
Reduced Redundancy (RRS) storage. It will fail (with a 403 error) if the object 
is archived in Glacier. In this case, you must use the RESTORE operation 
(described below) to make your data available in S3.

bq. You use S3’s new RESTORE operation to access an object archived in Glacier. 
As part of the request, you need to specify a retention period in days. 
Restoring an object will generally take 3 to 5 hours. Your restored object will 
remain in both Glacier and S3’s Reduced Redundancy Storage (RRS) for the 
duration of the retention period. At the end of the retention period the 
object’s data will be removed from S3; the object will remain in Glacier.

Like I said, I'd be interested in getting the full stack trace if you try to 
read this with an S3A client. Not for fixing, but for better reporting. 
Probably point them at Jeff's blog entry. Or this JIRA :)


> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-08-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140147#comment-16140147
 ] 

Steve Loughran commented on SPARK-21797:


Note that if it is just during spark partition calculation, it's probable that 
it is going down the directory tree and inspecting the files through HEAD 
requests, maybe looking at metadata entries too. So do attach the s3a & spark 
trace so we can see what's going on, as something may be over enthusastic about 
looking at files, or we could have something recognise the problem and recover 
from it.

> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21817) Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex

2017-08-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140161#comment-16140161
 ] 

Steve Loughran commented on SPARK-21817:


FYI, this is now fixed in hadoop trunk/3.0-beta-1

> Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex
> --
>
> Key: SPARK-21817
> URL: https://issues.apache.org/jira/browse/SPARK-21817
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ewan Higgs
>Priority: Minor
> Attachments: SPARK-21817.001.patch
>
>
> The implementation of HDFS-6984 now uses the passed in {{FSPermission}} to 
> pull out the ACL and other information. Therefore passing in a {{null}} is no 
> longer adequate and hence causes a NPE when listing files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-08-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139919#comment-16139919
 ] 

Steve Loughran commented on SPARK-21797:


If you are using S3// URLs then its the AWS team's problem. If you were using 
s3a://, then it'd be something you ask the hadoop team to look at, but we'd say 
no as

* it's a niche use case
* It's really slow, as in "read() takes so long other bits of the system will 
start to think your worker is hanging". Which means if you have speculative 
execution turned on, they kick off other workers to read the data.
* t's a very, very expensive way to work with data; $0.03/GB, which ramps up 
fast once multiple spark workers start reading the same datasets in parallel.
* Finally, it's been rejected on the server with a 403 response. That's Amazon 
S3 saying "no", not any of the clients.

You shouldn't be trying to process data direct from S3. Copy to S3 or a 
transient HDFS cluster, maybe as part of an oozie or airflow workflow.

Be curious about the fulll stack trace you see if you do try this with s3a://, 
even though it'll still be a WONTFIX. We could at least go for a more 
meaningful exception translation, and the retry logic needs to know that it 
won't go away if you try again

> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-08-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140408#comment-16140408
 ] 

Steve Loughran commented on SPARK-21797:


This is happening deep the Amazon EMR team's closed source {{EmrFileSystem}}, 
so nothing anyone here at the ASF can deal with directly; I'm confident S3A 
will handle it pretty similarly though, either in the open() call or shortly 
afterwards, in the first read(). All we could do there is convert to a more 
meaningful error, or actually check to see if the file is valid at open() time 
& again, fail meaningfully

At the Spark level, it's because Parquet is trying to read the footer of every 
file in parallel

the good news, you can tell Spark to ignore files it can't read. I believe this 
might be a quick workaround:
{code}
spark.sql.files.ignoreCorruptFiles=true
{code}

Let us know what happens


> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-08-24 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-21797:
---
Environment: Amazon EMR

> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Amazon EMR
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-08-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140408#comment-16140408
 ] 

Steve Loughran edited comment on SPARK-21797 at 8/24/17 5:56 PM:
-

This is happening deep in the Amazon EMR team's closed source 
{{EmrFileSystem}}, so nothing anyone here at the ASF can deal with directly; 
I'm confident S3A will handle it pretty similarly though, either in the open() 
call or shortly afterwards, in the first read(). All we could do there is 
convert to a more meaningful error, or actually check to see if the file is 
valid at open() time & again, fail meaningfully

At the Spark level, it's because Parquet is trying to read the footer of every 
file in parallel

the good news, you can tell Spark to ignore files it can't read. I believe this 
might be a quick workaround:
{code}
spark.sql.files.ignoreCorruptFiles=true
{code}

Let us know what happens



was (Author: ste...@apache.org):
This is happening deep the Amazon EMR team's closed source {{EmrFileSystem}}, 
so nothing anyone here at the ASF can deal with directly; I'm confident S3A 
will handle it pretty similarly though, either in the open() call or shortly 
afterwards, in the first read(). All we could do there is convert to a more 
meaningful error, or actually check to see if the file is valid at open() time 
& again, fail meaningfully

At the Spark level, it's because Parquet is trying to read the footer of every 
file in parallel

the good news, you can tell Spark to ignore files it can't read. I believe this 
might be a quick workaround:
{code}
spark.sql.files.ignoreCorruptFiles=true
{code}

Let us know what happens


> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Amazon EMR
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier

2017-08-25 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141483#comment-16141483
 ] 

Steve Loughran commented on SPARK-21797:


bq. According to our test, it is 20% slower maximum to read parquet data from 
S3 than HDFS. Do you agree? 

You are testing the amazon EMR client. try with the Hadoop 2.8 JARs and the s3a 
client, enable columnar store optimised seek with 
spark.hadoop.fs.s3a.experimental.fadvise=random & see how things compare then. 

You will still be at a disadvantage with any directory scanning/walking which 
can take seconds rather than millis, and seeks are still expensive as you have 
to issue new HTTP requests with different content ranges. And of course, AWS 
throttles your VMs and shard-specific access to subtrees of a single bucket. 
HDFS locally still wins

> spark cannot read partitioned data in S3 that are partly in glacier
> ---
>
> Key: SPARK-21797
> URL: https://issues.apache.org/jira/browse/SPARK-21797
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Amazon EMR
>Reporter: Boris Clémençon 
>  Labels: glacier, partitions, read, s3
>
> I have a dataset in parquet in S3 partitioned by date (dt) with oldest date 
> stored in AWS Glacier to save some money. For instance, we have...
> {noformat}
> s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier]
> s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier]
> ...
> s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier]
> {noformat}
> I want to read this dataset, but only a subset of date that are not yet in 
> glacier, eg:
> {code:java}
> val from = "2017-07-15"
> val to = "2017-08-24"
> val path = "s3://my-bucket/my-dataset/"
> val X = spark.read.parquet(path).where(col("dt").between(from, to))
> {code}
> Unfortunately, I have the exception
> {noformat}
> java.io.IOException: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  The operation is not valid for the object's storage class (Service: Amazon 
> S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: 
> C444D508B6042138)
> {noformat}
> I seems that spark does not like partitioned dataset when some partitions are 
> in Glacier. I could always read specifically each date, add the column with 
> current date and reduce(_ union _) at the end, but not pretty and it should 
> not be necessary.
> Is there any tip to read available data in the datastore even with old data 
> in glacier?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21762) FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible

2017-08-17 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-21762:
---
Summary: FileFormatWriter/BasicWriteTaskStatsTracker metrics collection 
fails if a new file isn't yet visible  (was: FileFormatWriter metrics 
collection fails if a newly close()d file isn't yet visible)

> FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new 
> file isn't yet visible
> 
>
> Key: SPARK-21762
> URL: https://issues.apache.org/jira/browse/SPARK-21762
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: object stores without complete creation consistency 
> (this includes AWS S3's caching of negative GET results)
>Reporter: Steve Loughran
>Priority: Minor
>
> The metrics collection of SPARK-20703 can trigger premature failure if the 
> newly written object isn't actually visible yet, that is if, after 
> {{writer.close()}}, a {{getFileStatus(path)}} returns a 
> {{FileNotFoundException}}.
> Strictly speaking, not having a file immediately visible goes against the 
> fundamental expectations of the Hadoop FS APIs, namely full consistent data & 
> medata across all operations, with immediate global visibility of all 
> changes. However, not all object stores make that guarantee, be it only newly 
> created data or updated blobs. And so spurious FNFEs can get raised, ones 
> which *should* have gone away by the time the actual task is committed. Or if 
> they haven't, the job is in such deep trouble.
> What to do?
> # leave as is: fail fast & so catch blobstores/blobstore clients which don't 
> behave as required. One issue here: will that trigger retries, what happens 
> there, etc, etc.
> # Swallow the FNFE and hope the file is observable later.
> # Swallow all IOEs and hope that whatever problem the FS has is transient.
> Options 2 & 3 aren't going to collect metrics in the event of a FNFE, or at 
> least, not the counter of bytes written.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21762) FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible

2017-08-17 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131190#comment-16131190
 ] 

Steve Loughran edited comment on SPARK-21762 at 8/17/17 7:41 PM:
-

SPARK-21669 simplifies this, especially testing, as it's isolated from 
FileFormatWriter. Same problem exists though: if you are getting any Create 
inconsistency, metrics probes trigger failures which may not be present by the 
time task commit actually takes place


was (Author: ste...@apache.org):
SPARK-20703 simplifies this, especially testing, as it's isolated from 
FileFormatWriter. Same problem exists though: if you are getting any Create 
inconsistency, metrics probes trigger failures which may not be present by the 
time task commit actually takes place

> FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new 
> file isn't yet visible
> 
>
> Key: SPARK-21762
> URL: https://issues.apache.org/jira/browse/SPARK-21762
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: object stores without complete creation consistency 
> (this includes AWS S3's caching of negative GET results)
>Reporter: Steve Loughran
>Priority: Minor
>
> The metrics collection of SPARK-20703 can trigger premature failure if the 
> newly written object isn't actually visible yet, that is if, after 
> {{writer.close()}}, a {{getFileStatus(path)}} returns a 
> {{FileNotFoundException}}.
> Strictly speaking, not having a file immediately visible goes against the 
> fundamental expectations of the Hadoop FS APIs, namely full consistent data & 
> medata across all operations, with immediate global visibility of all 
> changes. However, not all object stores make that guarantee, be it only newly 
> created data or updated blobs. And so spurious FNFEs can get raised, ones 
> which *should* have gone away by the time the actual task is committed. Or if 
> they haven't, the job is in such deep trouble.
> What to do?
> # leave as is: fail fast & so catch blobstores/blobstore clients which don't 
> behave as required. One issue here: will that trigger retries, what happens 
> there, etc, etc.
> # Swallow the FNFE and hope the file is observable later.
> # Swallow all IOEs and hope that whatever problem the FS has is transient.
> Options 2 & 3 aren't going to collect metrics in the event of a FNFE, or at 
> least, not the counter of bytes written.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21762) FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible

2017-08-17 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131190#comment-16131190
 ] 

Steve Loughran commented on SPARK-21762:


SPARK-20703 simplifies this, especially testing, as it's isolated from 
FileFormatWriter. Same problem exists though: if you are getting any Create 
inconsistency, metrics probes trigger failures which may not be present by the 
time task commit actually takes place

> FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new 
> file isn't yet visible
> 
>
> Key: SPARK-21762
> URL: https://issues.apache.org/jira/browse/SPARK-21762
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: object stores without complete creation consistency 
> (this includes AWS S3's caching of negative GET results)
>Reporter: Steve Loughran
>Priority: Minor
>
> The metrics collection of SPARK-20703 can trigger premature failure if the 
> newly written object isn't actually visible yet, that is if, after 
> {{writer.close()}}, a {{getFileStatus(path)}} returns a 
> {{FileNotFoundException}}.
> Strictly speaking, not having a file immediately visible goes against the 
> fundamental expectations of the Hadoop FS APIs, namely full consistent data & 
> medata across all operations, with immediate global visibility of all 
> changes. However, not all object stores make that guarantee, be it only newly 
> created data or updated blobs. And so spurious FNFEs can get raised, ones 
> which *should* have gone away by the time the actual task is committed. Or if 
> they haven't, the job is in such deep trouble.
> What to do?
> # leave as is: fail fast & so catch blobstores/blobstore clients which don't 
> behave as required. One issue here: will that trigger retries, what happens 
> there, etc, etc.
> # Swallow the FNFE and hope the file is observable later.
> # Swallow all IOEs and hope that whatever problem the FS has is transient.
> Options 2 & 3 aren't going to collect metrics in the event of a FNFE, or at 
> least, not the counter of bytes written.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed

2017-10-11 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200367#comment-16200367
 ] 

Steve Loughran commented on SPARK-22240:


Amazon EMR is amazon's own fork of Spark & Hadoop, with their own s3 
connectors: They explicitly say don't use 
s3a|http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-file-systems.html].
 

I was about to deny there was any problem with Spark & S3a, but after looking 
into things more, I think that HADOOP-14943 means that S3A is returning the 
filesize as its single partition for a file, when really it should be splitting 
it up. As a result, the partitioning is going to be limited by that set in 
{{SparkContext.defaultMinPartitions}} unless you pass in a partition number to 
your {{SparkContext.hadoopRDD}} calls. I'd do that until I can fix things in 
S3A.

> S3 CSV number of partitions incorrectly computed
> 
>
> Key: SPARK-22240
> URL: https://issues.apache.org/jira/browse/SPARK-22240
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0
>Reporter: Arthur Baudry
>
> Reading CSV out of S3 using S3A protocol does not compute the number of 
> partitions correctly in Spark 2.2.0.
> With Spark 2.2.0 I get only partition when loading a 14GB file
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 1
> {code}
> While in Spark 2.0.2 I had:
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 115
> {code}
> This introduces obvious performance issues in Spark 2.2.0. Maybe there is a 
> property that should be set to have the number of partitions computed 
> correctly.
> I'm aware that the .option("multiline","true") is not supported in Spark 
> 2.0.2, it's not relevant here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed

2017-10-11 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200485#comment-16200485
 ] 

Steve Loughran commented on SPARK-22240:


What's the link to the multiline JIRA? As that could explain why nobody else 
has reported it.

> S3 CSV number of partitions incorrectly computed
> 
>
> Key: SPARK-22240
> URL: https://issues.apache.org/jira/browse/SPARK-22240
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0
>Reporter: Arthur Baudry
>
> Reading CSV out of S3 using S3A protocol does not compute the number of 
> partitions correctly in Spark 2.2.0.
> With Spark 2.2.0 I get only partition when loading a 14GB file
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 1
> {code}
> While in Spark 2.0.2 I had:
> {code:java}
> scala> val input = spark.read.format("csv").option("header", 
> "true").option("delimiter", "|").option("multiLine", 
> "true").load("s3a://")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
> string ... 36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 115
> {code}
> This introduces obvious performance issues in Spark 2.2.0. Maybe there is a 
> property that should be set to have the number of partitions computed 
> correctly.
> I'm aware that the .option("multiline","true") is not supported in Spark 
> 2.0.2, it's not relevant here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9103) Tracking spark's memory usage

2017-09-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182372#comment-16182372
 ] 

Steve Loughran commented on SPARK-9103:
---

If it helps, most uses of ByteBuffer in hadoop core & HDFS pool their buffers 
through {{org.apache.hadoop.io.ByteBufferPool}} and 
{{org.apache.hadoop.util.DirectBufferPool}} ...if some instrumentation could be 
added there just to measure pool size, and pools were created with an owner 
name, then reporting could help apportion blame. Rather than just say "512MB 
were bytebuffers", it could say "DFS Client used 189MB; CryptoInputStream 32MB, 
etc. 

> Tracking spark's memory usage
> -
>
> Key: SPARK-9103
> URL: https://issues.apache.org/jira/browse/SPARK-9103
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core, Web UI
>Reporter: Zhang, Liye
> Attachments: Tracking Spark Memory Usage - Phase 1.pdf
>
>
> Currently spark only provides little memory usage information (RDD cache on 
> webUI) for the executors. User have no idea on what is the memory consumption 
> when they are running spark applications with a lot of memory used in spark 
> executors. Especially when they encounter the OOM, it’s really hard to know 
> what is the cause of the problem. So it would be helpful to give out the 
> detail memory consumption information for each part of spark, so that user 
> can clearly have a picture of where the memory is exactly used. 
> The memory usage info to expose should include but not limited to shuffle, 
> cache, network, serializer, etc.
> User can optionally choose to open this functionality since this is mainly 
> for debugging and tuning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22587) Spark job fails if fs.defaultFS and application jar are different url

2017-11-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266697#comment-16266697
 ] 

Steve Loughran commented on SPARK-22587:


See also 
[FileSystem.CACHE.Key.isEqual()|https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3530]
 is how the filesystem cache compares things (scheme + auth + UGI of owner of 
the FS).

FileSystem doesn't implement it's own .equals operator, nor do any subclasses: 
they rely on Object.equals. So if you compare that then the cache will return 
the same FS instance for both (assuming the conf didn't disable caching). 

You could probably implement a reliable check as follows

{code}
try {
  srcFs.makeQualified(destFs.getWorkingDir(
  return true;
} catch {
  case IllegalArgumentException =>
return false;
}
{code}

Not ideal, as the false response is expensive, but given if the two filesystems 
are different the client will copy the file, then the cost of raising an 
exception is lost in the noise

> Spark job fails if fs.defaultFS and application jar are different url
> -
>
> Key: SPARK-22587
> URL: https://issues.apache.org/jira/browse/SPARK-22587
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit
>Affects Versions: 1.6.3
>Reporter: Prabhu Joseph
>
> Spark Job fails if the fs.defaultFs and url where application jar resides are 
> different and having same scheme,
> spark-submit  --conf spark.master=yarn-cluster wasb://XXX/tmp/test.py
> core-site.xml fs.defaultFS is set to wasb:///YYY. Hadoop list works (hadoop 
> fs -ls) works for both the url XXX and YYY.
> {code}
> Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: 
> wasb://XXX/tmp/test.py, expected: wasb://YYY 
> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665) 
> at 
> org.apache.hadoop.fs.azure.NativeAzureFileSystem.checkPath(NativeAzureFileSystem.java:1251)
>  
> at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:485) 
> at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:396) 
> at 
> org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:507)
>  
> at 
> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:660) 
> at 
> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:912)
>  
> at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:172) 
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1248) 
> at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1307) 
> at org.apache.spark.deploy.yarn.Client.main(Client.scala) 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  
> at java.lang.reflect.Method.invoke(Method.java:498) 
> at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:751)
>  
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
> {code}
> The code Client.copyFileToRemote tries to resolve the path of application jar 
> (XXX) from the FileSystem object created using fs.defaultFS url (YYY) instead 
> of the actual url of application jar.
> val destFs = destDir.getFileSystem(hadoopConf)
> val srcFs = srcPath.getFileSystem(hadoopConf)
> getFileSystem will create the filesystem based on the url of the path and so 
> this is fine. But the below lines of code tries to get the srcPath (XXX url) 
> from the destFs (YYY url) and so it fails.
> var destPath = srcPath
> val qualifiedDestPath = destFs.makeQualified(destPath)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22587) Spark job fails if fs.defaultFS and application jar are different url

2017-11-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266674#comment-16266674
 ] 

Steve Loughran commented on SPARK-22587:


Jerry had already pulled me in for this; it's one of those little "pits of 
semantics" you can get pulled into, "the incident pit" as they call it in SCUBA.

summary: You need a case insensitive check for schema and serInfo too, maybe 
even port. 
# Allow for null though. 
# an consider using {{FileSystem.makeQualified(path)}} as the safety check

What does Hadoop get up to?
* FileSystem.checkPath does a full check of (scheme, authority), with the auth 
of the canonicalized URI (including default ports) (so hdfs://namnode/ and 
hdfs://namenode:9820/ refer to the same FS. That code dates from 2008, so 
should be considered normative.
* S3AFilesSystem.checkPath only looks at hostnames, because it tries to strip 
out user:password from Paths for security reasons
* Wasb uses FileSystem.checkPath, but does some hacks to also handle an older 
scheme of "asv". I wouldn't worry about that little detail though
* {{AbstractFileSystem.checkPath}} (the FileContext implementation code) 
doesn't check auth, it looks at host and mentions the fact that on a file:// 
reference the host may be null. Raises {{InvalidPathException}} (subclass of 
{{IllegalArgumentException}} if its unhappy.

Overall then: check auth with an .equalsIgnoreCase(), allow for null. Worry 
about default ports if you really want to. 

Filed HADOOP-15070 to cover this whole area better in docs & tests, should make 
the FileContext/FileSystem checks consistent and raise the same 
InvalidPathException.

One thing to consider is adding to the FS APIs some predicate 
{{isFileSystemPath(Path p)}} to do the validation without the overhead of 
exception throwing, and implement it in one single (consistent) place. Wouldn't 
be there until Hadoop 3.1 though, so not of any immediate benefit.

thank you for bringing this undocumented, unspecified, untested and 
inconsistent logic to my attention :)


> Spark job fails if fs.defaultFS and application jar are different url
> -
>
> Key: SPARK-22587
> URL: https://issues.apache.org/jira/browse/SPARK-22587
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit
>Affects Versions: 1.6.3
>Reporter: Prabhu Joseph
>
> Spark Job fails if the fs.defaultFs and url where application jar resides are 
> different and having same scheme,
> spark-submit  --conf spark.master=yarn-cluster wasb://XXX/tmp/test.py
> core-site.xml fs.defaultFS is set to wasb:///YYY. Hadoop list works (hadoop 
> fs -ls) works for both the url XXX and YYY.
> {code}
> Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: 
> wasb://XXX/tmp/test.py, expected: wasb://YYY 
> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665) 
> at 
> org.apache.hadoop.fs.azure.NativeAzureFileSystem.checkPath(NativeAzureFileSystem.java:1251)
>  
> at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:485) 
> at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:396) 
> at 
> org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:507)
>  
> at 
> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:660) 
> at 
> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:912)
>  
> at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:172) 
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1248) 
> at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1307) 
> at org.apache.spark.deploy.yarn.Client.main(Client.scala) 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  
> at java.lang.reflect.Method.invoke(Method.java:498) 
> at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:751)
>  
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
> {code}
> The code Client.copyFileToRemote tries to resolve the path of application jar 
> (XXX) from the FileSystem object created using fs.defaultFS url (YYY) instead 
> of the actual url of application jar.
> val destFs = destDir.getFileSystem(hadoopConf)
> val srcFs = srcPath.getFileSystem(hadoopConf)
> getFileSystem will create the filesystem based on the url of the path and so 
> this is fine. But the 

[jira] [Commented] (SPARK-22526) Document closing of PortableDataInputStream in binaryFiles

2017-11-27 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267015#comment-16267015
 ] 

Steve Loughran commented on SPARK-22526:


HADOOP-15071 updates the s3a troubleshooting with what it means when your code 
hangs on many reads; this will the s3a-side of the doc changes.

> Document closing of PortableDataInputStream in binaryFiles
> --
>
> Key: SPARK-22526
> URL: https://issues.apache.org/jira/browse/SPARK-22526
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Spark Core
>Affects Versions: 2.2.0
>Reporter: mohamed imran
>Priority: Minor
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Hi,
> I am using Spark 2.2.0(recent version) to read binary files from S3. I use 
> sc.binaryfiles to read the files.
> It is working fine until some 100 file read but later it get hangs 
> indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in 
> the later releases)
> I tried setting the fs.s3a.connection.maximum to some maximum values but 
> didn't help.
> And finally i ended up using the spark speculation parameter set which is 
> again didnt help much. 
> One thing Which I observed is that it is not closing the connection after 
> every read of binary files from the S3.
> example :- sc.binaryFiles("s3a://test/test123.zip")
> Please look into this major issue!  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3

2017-11-24 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265167#comment-16265167
 ] 

Steve Loughran commented on SPARK-22526:


it says it in the javadocs for [PortableInputStream.open()|: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala#L176].
 I don't see how it could be made much clearer there. I think it's implicitly 
"obvious" that streams need closing, because its considered standard in all 
java/scala code. You either make sure the code you call does it (docs or diving 
into it through the IDE), or you do it yourself. As close() is required to be 
idempotent, you can call it on a closed stream, so you may as well.

# There's nothing in {{SparkContext.binaryFiles()}}...you could maybe supply a 
mention there.
# there's only one use of PortableInputStream.open() which I can see in the 
code" {{PortableInputStream.toArray()}}; it does do a close() in finally. Those 
test ones are more verifying path resolution, except for those which use 
toArray, which does, as noted, close things.

I'm sorry you feel let down by the framework, but there's nothing which can be 
done in the code. At some point in the future someone could add something about 
binaryFiles to the markdown docs...perhaps, once you've used it more, you could 
be the volunteer?

> Spark hangs while reading binary files from S3
> --
>
> Key: SPARK-22526
> URL: https://issues.apache.org/jira/browse/SPARK-22526
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: mohamed imran
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Hi,
> I am using Spark 2.2.0(recent version) to read binary files from S3. I use 
> sc.binaryfiles to read the files.
> It is working fine until some 100 file read but later it get hangs 
> indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in 
> the later releases)
> I tried setting the fs.s3a.connection.maximum to some maximum values but 
> didn't help.
> And finally i ended up using the spark speculation parameter set which is 
> again didnt help much. 
> One thing Which I observed is that it is not closing the connection after 
> every read of binary files from the S3.
> example :- sc.binaryFiles("s3a://test/test123.zip")
> Please look into this major issue!  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3

2017-11-23 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264095#comment-16264095
 ] 

Steve Loughran commented on SPARK-22526:


# Fix the code you invoke
#. wrap the code you  invoke with something like (and this is coded in the 
JIRA, untested & should really close the stream in something to swallow IOEs.

{code}
binaryRdd.map { t =>
  try {
process(t._2)
  } finally {
t._2.close()
  }
}
{code}


> Spark hangs while reading binary files from S3
> --
>
> Key: SPARK-22526
> URL: https://issues.apache.org/jira/browse/SPARK-22526
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: mohamed imran
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Hi,
> I am using Spark 2.2.0(recent version) to read binary files from S3. I use 
> sc.binaryfiles to read the files.
> It is working fine until some 100 file read but later it get hangs 
> indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in 
> the later releases)
> I tried setting the fs.s3a.connection.maximum to some maximum values but 
> didn't help.
> And finally i ended up using the spark speculation parameter set which is 
> again didnt help much. 
> One thing Which I observed is that it is not closing the connection after 
> every read of binary files from the S3.
> example :- sc.binaryFiles("s3a://test/test123.zip")
> Please look into this major issue!  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22526) Spark hangs while reading binary files from S3

2017-11-23 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264095#comment-16264095
 ] 

Steve Loughran edited comment on SPARK-22526 at 11/23/17 3:47 PM:
--

# Fix the code you invoke
# wrap the code you  invoke with something like (and this is coded in the JIRA, 
untested & should really close the stream in something to swallow IOEs.

{code}
binaryRdd.map { t =>
  try {
process(t._2)
  } finally {
t._2.close()
  }
}
{code}



was (Author: ste...@apache.org):
# Fix the code you invoke
#. wrap the code you  invoke with something like (and this is coded in the 
JIRA, untested & should really close the stream in something to swallow IOEs.

{code}
binaryRdd.map { t =>
  try {
process(t._2)
  } finally {
t._2.close()
  }
}
{code}


> Spark hangs while reading binary files from S3
> --
>
> Key: SPARK-22526
> URL: https://issues.apache.org/jira/browse/SPARK-22526
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: mohamed imran
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Hi,
> I am using Spark 2.2.0(recent version) to read binary files from S3. I use 
> sc.binaryfiles to read the files.
> It is working fine until some 100 file read but later it get hangs 
> indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in 
> the later releases)
> I tried setting the fs.s3a.connection.maximum to some maximum values but 
> didn't help.
> And finally i ended up using the spark speculation parameter set which is 
> again didnt help much. 
> One thing Which I observed is that it is not closing the connection after 
> every read of binary files from the S3.
> example :- sc.binaryFiles("s3a://test/test123.zip")
> Please look into this major issue!  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3

2017-11-23 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264496#comment-16264496
 ] 

Steve Loughran commented on SPARK-22526:


I'm not giving a permanent fix. It's a bug in your code or the code you are 
invoking "forgets to close input stream"

Unless [~srowen] has other ideas, I'd recommend as closing as one of 
{WORKSFORME, WONTFIX or INVALID}

> Spark hangs while reading binary files from S3
> --
>
> Key: SPARK-22526
> URL: https://issues.apache.org/jira/browse/SPARK-22526
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: mohamed imran
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Hi,
> I am using Spark 2.2.0(recent version) to read binary files from S3. I use 
> sc.binaryfiles to read the files.
> It is working fine until some 100 file read but later it get hangs 
> indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in 
> the later releases)
> I tried setting the fs.s3a.connection.maximum to some maximum values but 
> didn't help.
> And finally i ended up using the spark speculation parameter set which is 
> again didnt help much. 
> One thing Which I observed is that it is not closing the connection after 
> every read of binary files from the S3.
> example :- sc.binaryFiles("s3a://test/test123.zip")
> Please look into this major issue!  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22657) Hadoop fs implementation classes are not loaded if they are part of the app jar or other jar when --packages flag is used

2017-11-30 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272974#comment-16272974
 ] 

Steve Loughran commented on SPARK-22657:


Hadoop FileSystem service introspection for FS binding has turned out to be 
more pain than it was worth it; needs to be revisited, but for other reasons. 

Why not just set fs.s3n.impl=org.apache.hadoop.fs.s3native.NativeS3FileSystem 
and bypass the service loader. Better yet, switch to s3a which declares itself 
that way anyway on hadoop 2.8+. 

> Hadoop fs implementation classes are not loaded if they are part of the app 
> jar or other jar when --packages flag is used 
> --
>
> Key: SPARK-22657
> URL: https://issues.apache.org/jira/browse/SPARK-22657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Stavros Kontopoulos
>
> To reproduce this issue run:
> ./bin/spark-submit --master mesos://leader.mesos:5050 \
> --packages com.github.scopt:scopt_2.11:3.5.0 \
> --conf spark.cores.max=8 \
> --conf 
> spark.mesos.executor.docker.image=mesosphere/spark:beta-2.1.1-2.2.0-2-hadoop-2.6
>  \
> --conf spark.mesos.executor.docker.forcePullImage=true \
> --class S3Job 
> http://s3-us-west-2.amazonaws.com/arand-sandbox-mesosphere/dcos-spark-scala-tests-assembly-0.1-SNAPSHOT.jar
>  \
> --readUrl s3n://arand-sandbox-mesosphere/big.txt --writeUrl 
> s3n://arand-sandbox-mesosphere/linecount.out
> within a container created with 
> mesosphere/spark:beta-2.1.1-2.2.0-2-hadoop-2.6 image
> You will get: "Exception in thread "main" java.io.IOException: No FileSystem 
> for scheme: s3n"
> This can be run reproduced with local[*] as well, no need to use mesos, this 
> is not mesos bug.
> The specific spark job used above can be found here: 
> https://github.com/mesosphere/spark-build/blob/d5c50e9ae3b1438e0c4ba96ff9f36d5dafb6a466/tests/jobs/scala/src/main/scala/S3Job.scala
>   
> Can be built with sbt assembly in that dir.
> Using this code : 
> https://gist.github.com/skonto/4f5ff1e5ede864f90b323cc20bf1e1cbat the 
> beginning of the main method...
> you get the following output : 
> https://gist.github.com/skonto/d22b8431586b6663ddd720e179030da4
> (Use 
> http://s3-eu-west-1.amazonaws.com/fdp-stavros-test/dcos-spark-scala-tests-assembly-0.1-SNAPSHOT.jar
>  to to get the modified job)
> The job works fine if --packages is not used.
> The commit that introduced this issue is (before that things work as 
> expected):
> 5800144a54f5c0180ccf67392f32c3e8a51119b1[m -[33m[m [SPARK-21012][SUBMIT] Add 
> glob support for resources adding to Spark [32m(5 months ago) 
> [1;34m[m Thu, 6 Jul 2017 15:32:49 +0800
> The exception comes from here: 
> https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3311
> https://github.com/apache/spark/pull/18235/files, check line 950, this is 
> where a filesystem is first created.
> The Filesystem class is initialized there, before the main of the spark job 
> is launched... the reason is --packages logic uses hadoop libraries to 
> download files
> Maven resolution happens before the app jar and the resolved jars are added 
> to the classpath. So at that moment there is no s3n to add to the static map 
> when the Filesystem static members are first initialized and also filled due 
> to the first FileSystem instance created (SERVICE_FILE_SYSTEMS).
> Later in the spark job main where we try to access the s3n filesystem (create 
> a second filesystem) we get the exception (at this point the app jar has the 
> s3n implementation in it and its on the class path but that scheme is not 
> loaded in the static map of the Filesystem class)... 
> hadoopConf.set("fs.s3n.impl.disable.cache", "true") has no effect since the 
> problem is with the static map which is filled once and only once.
> That's why we see two prints of the map contents in the output(gist)  above 
> when --packages is used. The first print is before creating the s3n 
> filesystem. We use reflection there to get the static map's entries. When 
> --packages is not used that map is empty before creating the s3n filesystem 
> since up to that point the Filesystem class is not yet loaded by the 
> classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22657) Hadoop fs implementation classes are not loaded if they are part of the app jar or other jar when --packages flag is used

2017-11-30 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16273191#comment-16273191
 ] 

Steve Loughran commented on SPARK-22657:


if you look at HADOOP-14138 you can see why we cut s3a from the list; 
HADOOP-14132 how we culled the references. Not only brittle, too much overhead 
in classloading, especially in shading. I think we could switch to something 
more minimal, but even there hadn't planned to reset the enum on classloader 
changes. Happy to add a static method to trigger that though. Indeed, if you 
submit the patch for HADOOP-14132, with tests, you can make sure it suits your 
needs

> Hadoop fs implementation classes are not loaded if they are part of the app 
> jar or other jar when --packages flag is used 
> --
>
> Key: SPARK-22657
> URL: https://issues.apache.org/jira/browse/SPARK-22657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Stavros Kontopoulos
>
> To reproduce this issue run:
> ./bin/spark-submit --master mesos://leader.mesos:5050 \
> --packages com.github.scopt:scopt_2.11:3.5.0 \
> --conf spark.cores.max=8 \
> --conf 
> spark.mesos.executor.docker.image=mesosphere/spark:beta-2.1.1-2.2.0-2-hadoop-2.6
>  \
> --conf spark.mesos.executor.docker.forcePullImage=true \
> --class S3Job 
> http://s3-us-west-2.amazonaws.com/arand-sandbox-mesosphere/dcos-spark-scala-tests-assembly-0.1-SNAPSHOT.jar
>  \
> --readUrl s3n://arand-sandbox-mesosphere/big.txt --writeUrl 
> s3n://arand-sandbox-mesosphere/linecount.out
> within a container created with 
> mesosphere/spark:beta-2.1.1-2.2.0-2-hadoop-2.6 image
> You will get: "Exception in thread "main" java.io.IOException: No FileSystem 
> for scheme: s3n"
> This can be run reproduced with local[*] as well, no need to use mesos, this 
> is not mesos bug.
> The specific spark job used above can be found here: 
> https://github.com/mesosphere/spark-build/blob/d5c50e9ae3b1438e0c4ba96ff9f36d5dafb6a466/tests/jobs/scala/src/main/scala/S3Job.scala
>   
> Can be built with sbt assembly in that dir.
> Using this code : 
> https://gist.github.com/skonto/4f5ff1e5ede864f90b323cc20bf1e1cbat the 
> beginning of the main method...
> you get the following output : 
> https://gist.github.com/skonto/d22b8431586b6663ddd720e179030da4
> (Use 
> http://s3-eu-west-1.amazonaws.com/fdp-stavros-test/dcos-spark-scala-tests-assembly-0.1-SNAPSHOT.jar
>  to to get the modified job)
> The job works fine if --packages is not used.
> The commit that introduced this issue is (before that things work as 
> expected):
> 5800144a54f5c0180ccf67392f32c3e8a51119b1[m -[33m[m [SPARK-21012][SUBMIT] Add 
> glob support for resources adding to Spark [32m(5 months ago) 
> [1;34m[m Thu, 6 Jul 2017 15:32:49 +0800
> The exception comes from here: 
> https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3311
> https://github.com/apache/spark/pull/18235/files, check line 950, this is 
> where a filesystem is first created.
> The Filesystem class is initialized there, before the main of the spark job 
> is launched... the reason is --packages logic uses hadoop libraries to 
> download files
> Maven resolution happens before the app jar and the resolved jars are added 
> to the classpath. So at that moment there is no s3n to add to the static map 
> when the Filesystem static members are first initialized and also filled due 
> to the first FileSystem instance created (SERVICE_FILE_SYSTEMS).
> Later in the spark job main where we try to access the s3n filesystem (create 
> a second filesystem) we get the exception (at this point the app jar has the 
> s3n implementation in it and its on the class path but that scheme is not 
> loaded in the static map of the Filesystem class)... 
> hadoopConf.set("fs.s3n.impl.disable.cache", "true") has no effect since the 
> problem is with the static map which is filled once and only once.
> That's why we see two prints of the map contents in the output(gist)  above 
> when --packages is used. The first print is before creating the s3n 
> filesystem. We use reflection there to get the static map's entries. When 
> --packages is not used that map is empty before creating the s3n filesystem 
> since up to that point the Filesystem class is not yet loaded by the 
> classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3

2017-11-22 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263021#comment-16263021
 ] 

Steve Loughran commented on SPARK-22526:


If the input stream doesn't get closed, there probably is going to be a GET 
kept open on every request. So I'd make sure that whatever you are running in 
your RDD does this. As the docs of {{PortableDataStream.open()}} say "The user 
of this method is responsible for closing the stream after usage."

> Spark hangs while reading binary files from S3
> --
>
> Key: SPARK-22526
> URL: https://issues.apache.org/jira/browse/SPARK-22526
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: mohamed imran
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Hi,
> I am using Spark 2.2.0(recent version) to read binary files from S3. I use 
> sc.binaryfiles to read the files.
> It is working fine until some 100 file read but later it get hangs 
> indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in 
> the later releases)
> I tried setting the fs.s3a.connection.maximum to some maximum values but 
> didn't help.
> And finally i ended up using the spark speculation parameter set which is 
> again didnt help much. 
> One thing Which I observed is that it is not closing the connection after 
> every read of binary files from the S3.
> example :- sc.binaryFiles("s3a://test/test123.zip")
> Please look into this major issue!  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18294) Implement commit protocol to support `mapred` package's committer

2017-12-15 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292595#comment-16292595
 ] 

Steve Loughran commented on SPARK-18294:


Following up on this, one question: Why support the older mapred protocol? 

The standard impl in Hadoop just relays to the new stuff, it just complicates 
everyones life as there's two test paths, APIs to document, risk of different 
failure modes.

The v1 API isn't being actively developed, and really its time to move off it

> Implement commit protocol to support `mapred` package's committer
> -
>
> Key: SPARK-18294
> URL: https://issues.apache.org/jira/browse/SPARK-18294
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Jiang Xingbo
>Assignee: Jiang Xingbo
> Fix For: 2.3.0
>
>
> Current `FileCommitProtocol` is based on `mapreduce` package, we should 
> implement a `HadoopMapRedCommitProtocol` that supports the older mapred 
> package's commiter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14959) ​Problem Reading partitioned ORC or Parquet files

2017-11-17 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256789#comment-16256789
 ] 

Steve Loughran commented on SPARK-14959:


Came across a reference to this while scanning for getFileBlockLocations() use.

HDFS shouldn't be throwing this. {{getFileBlockLocations(Path, offset, len)}} 
is nominally the same as {{getFileBlockLocations(getFileStatus(Path), offset, 
len)}}; the latter will return an empty array on a directory. Looks like the 
HDFS behaviour has been there for years, and people can argue that it's the 
correct behaviour: but its the only subclass of the base FileSystem 
implementation, and it doesn't fail on a directory. Maybe it can be fixed, at 
the very least the behaviour needs to be specified explicitly. 

> ​Problem Reading partitioned ORC or Parquet files
> -
>
> Key: SPARK-14959
> URL: https://issues.apache.org/jira/browse/SPARK-14959
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: Hadoop 2.7.1.2.4.0.0-169 (HDP 2.4)
>Reporter: Sebastian YEPES FERNANDEZ
>Assignee: Xin Wu
>Priority: Blocker
> Fix For: 2.0.0
>
>
> Hello,
> I have noticed that in the pasts days there is an issue when trying to read 
> partitioned files from HDFS.
> I am running on Spark master branch #c544356
> The write actually works but the read fails.
> {code:title=Issue Reproduction}
> case class Data(id: Int, text: String)
> val ds = spark.createDataset( Seq(Data(0, "hello"), Data(1, "hello"), Data(0, 
> "world"), Data(1, "there")) )
> scala> 
> ds.write.mode(org.apache.spark.sql.SaveMode.Overwrite).format("parquet").partitionBy("id").save("/user/spark/test.parquet")
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
>   
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> java.io.FileNotFoundException: Path is not a file: 
> /user/spark/test.parquet/id=0
> at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
> at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
> at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:652)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
> at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2151)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2147)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2145)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>   at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>   at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1242)
>   at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
>   at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1285)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:221)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:217)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> 

[jira] [Commented] (SPARK-16996) Hive ACID delta files not seen

2017-11-16 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16255329#comment-16255329
 ] 

Steve Loughran commented on SPARK-16996:


[~maver1ck]

Spark hive is custom as it was modified to sort classpath consistencies between 
things (Kryo, Spark, Hive, something else). You can't mix them without seeing a 
stack trace somewhere...the main variable is "where", not "whether"

Given this is HDP, not the ASF binaries; I'd take it up via the support process 
there, starting with the [online 
forums|https://community.hortonworks.com/index.html]

> Hive ACID delta files not seen
> --
>
> Key: SPARK-16996
> URL: https://issues.apache.org/jira/browse/SPARK-16996
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.3, 2.1.2, 2.2.0
> Environment: Hive 1.2.1, Spark 1.5.2
>Reporter: Benjamin BONNET
>Priority: Critical
>
> spark-sql seems not to see data stored as delta files in an ACID Hive table.
> Actually I encountered the same problem as describe here : 
> http://stackoverflow.com/questions/35955666/spark-sql-is-not-returning-records-for-hive-transactional-tables-on-hdp
> For example, create an ACID table with HiveCLI and insert a row :
> {code}
> set hive.support.concurrency=true;
> set hive.enforce.bucketing=true;
> set hive.exec.dynamic.partition.mode=nonstrict;
> set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
> set hive.compactor.initiator.on=true;
> set hive.compactor.worker.threads=1;
>  CREATE TABLE deltas(cle string,valeur string) CLUSTERED BY (cle) INTO 1 
> BUCKETS
> ROW FORMAT SERDE  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
> STORED AS 
>   INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
>   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
> TBLPROPERTIES ('transactional'='true');
> INSERT INTO deltas VALUES("a","a");
> {code}
> Then make a query with spark-sql CLI :
> {code}
> SELECT * FROM deltas;
> {code}
> That query gets no result and there are no errors in logs.
> If you go to HDFS to inspect table files, you find only deltas
> {code}
> ~>hdfs dfs -ls /apps/hive/warehouse/deltas
> Found 1 items
> drwxr-x---   - me hdfs  0 2016-08-10 14:03 
> /apps/hive/warehouse/deltas/delta_0020943_0020943
> {code}
> Then if you run compaction on that table (in HiveCLI) :
> {code}
> ALTER TABLE deltas COMPACT 'MAJOR';
> {code}
> As a result, the delta will be compute into a base file :
> {code}
> ~>hdfs dfs -ls /apps/hive/warehouse/deltas
> Found 1 items
> drwxrwxrwx   - me hdfs  0 2016-08-10 15:25 
> /apps/hive/warehouse/deltas/base_0020943
> {code}
> Go back to spark-sql and the same query gets a result :
> {code}
> SELECT * FROM deltas;
> a   a
> Time taken: 0.477 seconds, Fetched 1 row(s)
> {code}
> But next time you make an insert into Hive table : 
> {code}
> INSERT INTO deltas VALUES("b","b");
> {code}
> spark-sql will immediately see changes : 
> {code}
> SELECT * FROM deltas;
> a   a
> b   b
> Time taken: 0.122 seconds, Fetched 2 row(s)
> {code}
> Yet there was no other compaction, but spark-sql "sees" the base AND the 
> delta file :
> {code}
> ~> hdfs dfs -ls /apps/hive/warehouse/deltas
> Found 2 items
> drwxrwxrwx   - valdata hdfs  0 2016-08-10 15:25 
> /apps/hive/warehouse/deltas/base_0020943
> drwxr-x---   - valdata hdfs  0 2016-08-10 15:31 
> /apps/hive/warehouse/deltas/delta_0020956_0020956
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17593) list files on s3 very slow

2017-11-10 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247886#comment-16247886
 ] 

Steve Loughran commented on SPARK-17593:


Hey nick, 

yes, need to move to FileSystem.list(path, recursive=true) & then iterate 
through the results. This actually scales better to the many thousands of 
files, but you'd know that. Not done a patch for that myself.

Moving to that won't be any worse for spark on older hadoop versions, but will 
get the read time speedup on Hadoop 2.8+. Write performance a separate issue, 
which is really "commit algorithms for blob storage". 

> list files on s3 very slow
> --
>
> Key: SPARK-17593
> URL: https://issues.apache.org/jira/browse/SPARK-17593
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.0
> Environment: spark 2.0.0, hadoop 2.7.2 ( hadoop 2.7.3)
>Reporter: Gaurav Shah
>Priority: Minor
>
> lets say we have following partitioned data:
> {code}
> events_v3
> -- event_date=2015-01-01
>  event_hour=0
> -- verb=follow
> part1.parquet.gz 
>  event_hour=1
> -- verb=click
> part1.parquet.gz 
> -- event_date=2015-01-02
>  event_hour=5
> -- verb=follow
> part1.parquet.gz 
>  event_hour=10
> -- verb=click
> part1.parquet.gz 
> {code}
> To read (or write ) parquet partitioned data via spark it makes call to 
> `ListingFileCatalog.listLeafFiles` .  Which recursively tries to list all 
> files and folders.
> In this case if we had 300 dates, we would have created 300 jobs each trying 
> to get filelist from date_directory. This process takes about 10 minutes to 
> finish ( with 2 executors). vs if I use a ruby script to get list of all 
> files recursively in the same folder it takes about 1 minute, on the same 
> machine with just 1 thread. 
> I am confused as to why it would take so much time extra for listing files.
> spark code:
> {code:scala}
> val sparkSession = org.apache.spark.sql.SparkSession.builder
> .config("spark.sql.hive.metastorePartitionPruning",true)
> .config("spark.sql.parquet.filterPushdown", true)
> .config("spark.sql.hive.verifyPartitionPath", false)
> .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false)
> .config("parquet.enable.summary-metadata",false)
> .config("spark.sql.sources.partitionDiscovery.enabled",false)
> .getOrCreate()
> val df = 
> sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3")
> df.createOrReplaceTempView("temp_events")
> sparkSession.sql(
>   """
> |select verb,count(*) from temp_events where event_date = 
> "2016-08-05" group by verb
>   """.stripMargin).show()
> {code}
> ruby code:
> {code:ruby}
> gem 'aws-sdk', '~> 2'
> require 'aws-sdk'
> client = Aws::S3::Client.new(:region=>'us-west-1')
> next_continuation_token = nil
> total = 0
> loop do
> a= client.list_objects_v2({
>   bucket: "bucket", # required
>   max_keys: 1000,
>   prefix: "events_v3/",
>   continuation_token: next_continuation_token ,
>   fetch_owner: false,
> })
> puts a.contents.last.key
> total += a.contents.size
> next_continuation_token = a.next_continuation_token
> break unless a.is_truncated
> end
> puts "total"
> puts total
> {code}
> tried looking into following bug:
> https://issues.apache.org/jira/browse/HADOOP-12810
> but hadoop 2.7.3 doesn't solve that problem
> stackoverflow reference:
> http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   >