[jira] [Commented] (SPARK-20703) Add an operator for writing data out
[ https://issues.apache.org/jira/browse/SPARK-20703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085559#comment-16085559 ] Steve Loughran commented on SPARK-20703: Regarding a patch for this, what do people suggest as a good policy for failures FNFE: log@debug, return 0 IOE: log@info, full stack @debug, return 0 or treat FNFE the same as any other IOE (which is most likely to be a transient network failure, given by this point we know that the user has connectivity, auth and write access to the bucket)? # I would like the option of printing that full stack, so that if the time comes to debug it, saying "edit this log setting" is enough to make problems visible in production. # Yet I don't want to log the full stack as a default as that can lead to panic, and it also tends to mean when there are real problems in the code people may just paste that stack in, rather than one for the real problem. > Add an operator for writing data out > > > Key: SPARK-20703 > URL: https://issues.apache.org/jira/browse/SPARK-20703 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Liang-Chi Hsieh > Fix For: 2.3.0 > > > We should add an operator for writing data out. Right now in the explain plan > / UI there is no way to tell whether a query is writing data out, and also > there is no way to associate metrics with data writes. It'd be tremendously > valuable to do this for adding metrics and for visibility. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21374) Reading globbed paths from S3 into DF doesn't work if filesystem caching is disabled
[ https://issues.apache.org/jira/browse/SPARK-21374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085645#comment-16085645 ] Steve Loughran commented on SPARK-21374: This is possibly a sign that your new configuration isn't having its auth values picked up, or they are incorrect (i.e properties are wrong). Its working for enabled caching as some other codepath has set them up with the right properties, and so when used in the DF, the previous params are picked up. # if using Hadoop 2.7.x JARs, switch to s3a and use s3a in the URLs & settings. You don't need to set the fs.s3a.impl field either; done for yoiu. # if you can upgrade to Hadoop 2.8 binaries, you can use per-bucket configuration; this does exactly what you want: lets you configure different auth details for different buckets, without having to play these games. See [https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Configuring_different_S3_buckets] and [https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_cloud-data-access/content/s3-per-bucket-configs.html] going to 2.8 binaries (or anything with the feature backported to a 2.7.x variant) should solve your problem without you having to worry about what you are seeing here. > Reading globbed paths from S3 into DF doesn't work if filesystem caching is > disabled > > > Key: SPARK-21374 > URL: https://issues.apache.org/jira/browse/SPARK-21374 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1 >Reporter: Andrey Taptunov > > *Motivation:* > In my case I want to disable filesystem cache to be able to change S3's > access key and secret key on the fly to read from buckets with different > permissions. This works perfectly fine for RDDs but doesn't work for DFs. > *Example (works for RDD but fails for DataFrame):* > {code:java} > import org.apache.spark.SparkContext > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > object SimpleApp { > def main(args: Array[String]) { > val awsAccessKeyId = "something" > val awsSecretKey = "something else" > val conf = new SparkConf().setAppName("Simple > Application").setMaster("local[*]") > val sc = new SparkContext(conf) > sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", awsAccessKeyId) > sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", awsSecretKey) > sc.hadoopConfiguration.setBoolean("fs.s3.impl.disable.cache",true) > > sc.hadoopConfiguration.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") > sc.hadoopConfiguration.set("fs.s3.buffer.dir","/tmp") > val spark = SparkSession.builder().config(conf).getOrCreate() > val rddFile = sc.textFile("s3://bucket/file.csv").count // ok > val rddGlob = sc.textFile("s3://bucket/*").count // ok > val dfFile = spark.read.format("csv").load("s3://bucket/file.csv").count > // ok > > val dfGlob = spark.read.format("csv").load("s3://bucket/*").count > // IllegalArgumentExcepton. AWS Access Key ID and Secret Access Key must > be specified as the username or password (respectively) > // of a s3 URL, or by setting the fs.s3.awsAccessKeyId or > fs.s3.awsSecretAccessKey properties (respectively). > > sc.stop() > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure
[ https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088630#comment-16088630 ] Steve Loughran commented on SPARK-19790: I've now summarised the FileOutputCommitter v1 and v2 algorithms as far as I can understand from the source and some step throughts; I think I'd need to add a few more tests to be really sure I understand it: [https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md] h2. v1 # task commits by atomic rename() of {{$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID}} to the completed dir {{$dest/__temporary/$jobAttempt/$taskAttempt}} ; job commit moves all data under each task attempt into $dest. Note that as only one rename() to that dest will work, race conditions in speculative work are prevented without the need for expliicit co-ord # failure during task commit: delete the completed task directory (if any), rerun task. # job commit: list completed tasks, move/merge them into the dest dir. Not atomic, rename operation count is per task & dir tree,. A failure in job commit is not recoverable. # failure during job commit: job in unknown state, rm $dest and rerun # job restart: after failure of entire job, it can be restarted, with restarted job using the completed tasks. Provided rename() is atomic there's a guarantee that every task's completed dir is valid; provided it's O(1) its an inexpensvie operation h3. v2 # task commit: copy straight to dest (this is where co-ordination is needed between tasks and job manager) # job commit: no-op # job restart: none, start again # failure during task commit: dest dir in unknown state, job restart needed # failure during job commit: job in unknown state, rm $dest and rerun Given that spark doesn't do job restart, switching to v2 everywhere reduces #of renames, but makes recovery from failure during task commit impossible Neither algorithm is correct on an inconsistent S3 endpoint, as they both can get incorrect listings of files to COPY + DELETE. With consistency, you still have O(data) task commits on both algorithms, and another O(data) job commit with v1. Azure WASB is consistent, and uses leases for exclusive access to bits of the store on comnmit, but even it can do with a store-specific committer. Rename is not the solution to committing data in an object store. > OutputCommitCoordinator should not allow another task to commit after an > ExecutorFailure > > > Key: SPARK-19790 > URL: https://issues.apache.org/jira/browse/SPARK-19790 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > The OutputCommitCoordinator resets the allowed committer when the task fails. > > https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143 > However, if a task fails because of an ExecutorFailure, we actually have no > idea what the status is of the task. The task may actually still be running, > and perhaps successfully commit its output. By allowing another task to > commit its output, there is a chance that multiple tasks commit, which can > result in corrupt output. This would be particularly problematic when > commit() is an expensive operation, eg. moving files on S3. > For other task failures, we can allow other tasks to commit. But with an > ExecutorFailure, its not clear what the right thing to do is. The only safe > thing to do may be to fail the job. > This is related to SPARK-19631, and was discovered during discussion on that > PR https://github.com/apache/spark/pull/16959#discussion_r103549134 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20703) Add an operator for writing data out
[ https://issues.apache.org/jira/browse/SPARK-20703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088632#comment-16088632 ] Steve Loughran commented on SPARK-20703: ..got a patch for this, but want to see if I can create a unit test for it: an outputWriter which temporarily renamed the output file would do it, but then it needs to rename it back again before the next stage in the process. Or maybe not: that's what tests are for. Expect a PR in august > Add an operator for writing data out > > > Key: SPARK-20703 > URL: https://issues.apache.org/jira/browse/SPARK-20703 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Liang-Chi Hsieh > Fix For: 2.3.0 > > > We should add an operator for writing data out. Right now in the explain plan > / UI there is no way to tell whether a query is writing data out, and also > there is no way to associate metrics with data writes. It'd be tremendously > valuable to do this for adding metrics and for visibility. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20107) Add spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version option to configuration.md
[ https://issues.apache.org/jira/browse/SPARK-20107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16073951#comment-16073951 ] Steve Loughran commented on SPARK-20107: If you are curious, I've just written out the v1 and v2 commit algorithms with cost/complexity estimates: https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md > Add spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version option to > configuration.md > --- > > Key: SPARK-20107 > URL: https://issues.apache.org/jira/browse/SPARK-20107 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 2.1.0 >Reporter: Yuming Wang >Assignee: Yuming Wang >Priority: Trivial > Fix For: 2.2.0 > > > Set {{spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2}} can > speed up > [HadoopMapReduceCommitProtocol#commitJob|https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L121] > for many output files. > It can speed up {{11 minutes}} for 216869 output files: > {code:sql} > CREATE TABLE tmp.spark_20107 AS SELECT > category_id, > product_id, > track_id, > concat( > substr(ds, 3, 2), > substr(ds, 6, 2), > substr(ds, 9, 2) > ) shortDate, > CASE WHEN actiontype = '0' THEN 'browse' WHEN actiontype = '1' THEN 'fav' > WHEN actiontype = '2' THEN 'cart' WHEN actiontype = '3' THEN 'order' ELSE > 'invalid actio' END AS type > FROM > tmp.user_action > WHERE > ds > date_sub('2017-01-23', 730) > AND actiontype IN ('0','1','2','3'); > {code} > {code} > $ hadoop fs -ls /user/hive/warehouse/tmp.db/spark_20107 | wc -l > 216870 > {code} > We should add this option to > [configuration.md|http://spark.apache.org/docs/latest/configuration.html]. > All cloudera's hadoop 2.6.0-cdh5.4.0 or higher versions(see: > [cloudera/hadoop-common@1c12361|https://github.com/cloudera/hadoop-common/commit/1c1236182304d4075276c00c4592358f428bc433] > and > [cloudera/hadoop-common@16b2de2|https://github.com/cloudera/hadoop-common/commit/16b2de27321db7ce2395c08baccfdec5562017f0]) > and apache's hadoop 2.7.0 or higher versions support this improvement. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12868) ADD JAR via sparkSQL JDBC will fail when using a HDFS URL
[ https://issues.apache.org/jira/browse/SPARK-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068301#comment-16068301 ] Steve Loughran commented on SPARK-12868: It's actually not the cause of that, merely the messenger. Cause is HADOOP-14383: a combination of spark 2.2 & Hadoop 2.9+ will trigger the problem. Fix belongs in Hadoop. > ADD JAR via sparkSQL JDBC will fail when using a HDFS URL > - > > Key: SPARK-12868 > URL: https://issues.apache.org/jira/browse/SPARK-12868 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Trystan Leftwich >Assignee: Weiqing Yang > Fix For: 2.2.0 > > > When trying to add a jar with a HDFS URI, i.E > {code:sql} > ADD JAR hdfs:///tmp/foo.jar > {code} > Via the spark sql JDBC interface it will fail with: > {code:sql} > java.net.MalformedURLException: unknown protocol: hdfs > at java.net.URL.(URL.java:593) > at java.net.URL.(URL.java:483) > at java.net.URL.(URL.java:432) > at java.net.URI.toURL(URI.java:1089) > at > org.apache.spark.sql.hive.client.ClientWrapper.addJar(ClientWrapper.scala:578) > at org.apache.spark.sql.hive.HiveContext.addJar(HiveContext.scala:652) > at org.apache.spark.sql.hive.execution.AddJar.run(commands.scala:89) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) > at org.apache.spark.sql.DataFrame.(DataFrame.scala:145) > at org.apache.spark.sql.DataFrame.(DataFrame.scala:130) > at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817) > at > org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:211) > at > org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:154) > at > org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:151) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > at > org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:164) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21137) Spark reads many small files slowly
[ https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065423#comment-16065423 ] Steve Loughran commented on SPARK-21137: Looking at this. something is trying to get the permissions for every file, which is being dealt with by an exec & all the overheads of that. Looking at the code, it's in the constructor of {{LocatedFileStatus}}, which is building it from another {{FileStatus}}. Which normally is just a simple copy of a field (fast, efficient). Looks like on RawLocalFileSystem, it actually triggers an on demand execution. Been around for a long time (HADOOP-2288), surfacing here because you're working with the local FS. For all other filesystems it's a quick operation. I think this is an issue: I don't think anybody thought this would be a problem, as it's just viewed as a marshalling of a LocatedFileStatus, which is what you get back from {{FileSystem.listLocatedStatus}}. Normally that's the higher performing one, not just on object stores, but because it scales better, being able to incrementally send back data in batches, rather than needing to enumerate an entire directory of files (possibly in the millions) and then send them around as arrays of FileStatus. Here, it's clearly not. What to do? I think we could consider whether it'd be possible to add this to the hadoop native libs & so make a fast API call. There's also the option of "allowing us to completely disable permissions entirely". That one appeals to me more from a windows perspective, where you could get rid of the hadoop native lib and still have (most) things work there...but as its an incomplete "most" it's probably an optimistic goal. > Spark reads many small files slowly > --- > > Key: SPARK-21137 > URL: https://issues.apache.org/jira/browse/SPARK-21137 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: sam >Priority: Minor > > A very common use case in big data is to read a large number of small files. > For example the Enron email dataset has 1,227,645 small files. > When one tries to read this data using Spark one will hit many issues. > Firstly, even if the data is small (each file only say 1K) any job can take a > very long time (I have a simple job that has been running for 3 hours and has > not yet got to the point of starting any tasks, I doubt if it will ever > finish). > It seems all the code in Spark that manages file listing is single threaded > and not well optimised. When I hand crank the code and don't use Spark, my > job runs much faster. > Is it possible that I'm missing some configuration option? It seems kinda > surprising to me that Spark cannot read Enron data given that it's such a > quintessential example. > So it takes 1 hour to output a line "1,227,645 input paths to process", it > then takes another hour to output the same line. Then it outputs a CSV of all > the input paths (so creates a text storm). > Now it's been stuck on the following: > {code} > 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo > library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608] > {code} > for 2.5 hours. > So I've provided full reproduce steps here (including code and cluster setup) > https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can > easily just clone, and follow the README to reproduce exactly! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21137) Spark reads many small files slowly
[ https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065445#comment-16065445 ] Steve Loughran commented on SPARK-21137: Filed HADOOP-14600. Looks like a v. old codepath that's not been looked at...and since then the native lib fstat call should be able to do this, just retain the old code for when {{NativeCodeLoader.isNativeCodeLoaded() == false}}. Sam, you said bq. it's likely that the underlying Hadoop APIs have some yucky code that does something silly, I have delved down their before and my stomach cannot handle it oh, it's not so bad. At least you don't have to go near the assembly code bit. That we are all scared of. Or worse, Kerberos. Anyway, patches welcome there, with tests > Spark reads many small files slowly > --- > > Key: SPARK-21137 > URL: https://issues.apache.org/jira/browse/SPARK-21137 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: sam >Priority: Minor > > A very common use case in big data is to read a large number of small files. > For example the Enron email dataset has 1,227,645 small files. > When one tries to read this data using Spark one will hit many issues. > Firstly, even if the data is small (each file only say 1K) any job can take a > very long time (I have a simple job that has been running for 3 hours and has > not yet got to the point of starting any tasks, I doubt if it will ever > finish). > It seems all the code in Spark that manages file listing is single threaded > and not well optimised. When I hand crank the code and don't use Spark, my > job runs much faster. > Is it possible that I'm missing some configuration option? It seems kinda > surprising to me that Spark cannot read Enron data given that it's such a > quintessential example. > So it takes 1 hour to output a line "1,227,645 input paths to process", it > then takes another hour to output the same line. Then it outputs a CSV of all > the input paths (so creates a text storm). > Now it's been stuck on the following: > {code} > 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo > library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608] > {code} > for 2.5 hours. > So I've provided full reproduce steps here (including code and cluster setup) > https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can > easily just clone, and follow the README to reproduce exactly! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21137) Spark reads many small files slowly
[ https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065428#comment-16065428 ] Steve Loughran commented on SPARK-21137: ps, for now, do it in parallel: {{mapreduce.input.fileinputformat.list-status.num-threads}} . There though, the fact that every thread will be exec()ing code can make it expensive (looks like a full Posix spawn on a mac. > Spark reads many small files slowly > --- > > Key: SPARK-21137 > URL: https://issues.apache.org/jira/browse/SPARK-21137 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: sam >Priority: Minor > > A very common use case in big data is to read a large number of small files. > For example the Enron email dataset has 1,227,645 small files. > When one tries to read this data using Spark one will hit many issues. > Firstly, even if the data is small (each file only say 1K) any job can take a > very long time (I have a simple job that has been running for 3 hours and has > not yet got to the point of starting any tasks, I doubt if it will ever > finish). > It seems all the code in Spark that manages file listing is single threaded > and not well optimised. When I hand crank the code and don't use Spark, my > job runs much faster. > Is it possible that I'm missing some configuration option? It seems kinda > surprising to me that Spark cannot read Enron data given that it's such a > quintessential example. > So it takes 1 hour to output a line "1,227,645 input paths to process", it > then takes another hour to output the same line. Then it outputs a CSV of all > the input paths (so creates a text storm). > Now it's been stuck on the following: > {code} > 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo > library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608] > {code} > for 2.5 hours. > So I've provided full reproduce steps here (including code and cluster setup) > https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can > easily just clone, and follow the README to reproduce exactly! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12868) ADD JAR via sparkSQL JDBC will fail when using a HDFS URL
[ https://issues.apache.org/jira/browse/SPARK-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065448#comment-16065448 ] Steve Loughran commented on SPARK-12868: I think this is the case of HADOOP-14598: once the FS has been set to {{FsUrlStreamHandlerFactory}} in {{org.apache.spark.sql.internal.SharedState}}, you can't talk to Azure. > ADD JAR via sparkSQL JDBC will fail when using a HDFS URL > - > > Key: SPARK-12868 > URL: https://issues.apache.org/jira/browse/SPARK-12868 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Trystan Leftwich >Assignee: Weiqing Yang > Fix For: 2.2.0 > > > When trying to add a jar with a HDFS URI, i.E > {code:sql} > ADD JAR hdfs:///tmp/foo.jar > {code} > Via the spark sql JDBC interface it will fail with: > {code:sql} > java.net.MalformedURLException: unknown protocol: hdfs > at java.net.URL.(URL.java:593) > at java.net.URL.(URL.java:483) > at java.net.URL.(URL.java:432) > at java.net.URI.toURL(URI.java:1089) > at > org.apache.spark.sql.hive.client.ClientWrapper.addJar(ClientWrapper.scala:578) > at org.apache.spark.sql.hive.HiveContext.addJar(HiveContext.scala:652) > at org.apache.spark.sql.hive.execution.AddJar.run(commands.scala:89) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) > at org.apache.spark.sql.DataFrame.(DataFrame.scala:145) > at org.apache.spark.sql.DataFrame.(DataFrame.scala:130) > at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817) > at > org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:211) > at > org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:154) > at > org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:151) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > at > org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:164) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21137) Spark reads many small files slowly
[ https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065515#comment-16065515 ] Steve Loughran commented on SPARK-21137: bq. so it is something that could be optimized in the Hadoop API, and seems somewhat specific to a local filesystem. yeah, its something unique to the local FS (maybe things which extend it, like glusterfs —[~jayunit100] ?). Fix would be straightforward if someone goes down to it & writes the test. Ideally a scale one which creates 1K+ small files and then measures the diff in times between listStatus as a before/after. > Spark reads many small files slowly > --- > > Key: SPARK-21137 > URL: https://issues.apache.org/jira/browse/SPARK-21137 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: sam >Priority: Minor > > A very common use case in big data is to read a large number of small files. > For example the Enron email dataset has 1,227,645 small files. > When one tries to read this data using Spark one will hit many issues. > Firstly, even if the data is small (each file only say 1K) any job can take a > very long time (I have a simple job that has been running for 3 hours and has > not yet got to the point of starting any tasks, I doubt if it will ever > finish). > It seems all the code in Spark that manages file listing is single threaded > and not well optimised. When I hand crank the code and don't use Spark, my > job runs much faster. > Is it possible that I'm missing some configuration option? It seems kinda > surprising to me that Spark cannot read Enron data given that it's such a > quintessential example. > So it takes 1 hour to output a line "1,227,645 input paths to process", it > then takes another hour to output the same line. Then it outputs a CSV of all > the input paths (so creates a text storm). > Now it's been stuck on the following: > {code} > 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo > library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608] > {code} > for 2.5 hours. > So I've provided full reproduce steps here (including code and cluster setup) > https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can > easily just clone, and follow the README to reproduce exactly! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7481) Add spark-hadoop-cloud module to pull in object store support
[ https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984771#comment-15984771 ] Steve Loughran commented on SPARK-7481: --- I think we ended up going in circles on that PR. Sean has actually been very tolerant of me, however it's been hampered by my full time focus on other thingsr. I've only been had time to work on the spark PR intermittently and that's been hard for all: me in the rebase/retest, the one reviewer in having to catch up again. Now, anyone who does manage to get that CP right will discover that S3A absolutely flies with Spark, in partitioning (list file improvements), data input (set fadvise=true for ORC and Parquet), and for output (set fast.output=true, play with the pool options). It delivers that performance because this patch set things up for the integration tests, downstream of this patch so I and others can be confident that the things actually work, at sped, at scale. Indeed, many of S3A performance work was actually based on Hive and Spark workloads:, the data formats & their seek patterns, directory layouts, file generation. All that's left is the little problem of getting the classpath right. Oh, and the committer. For now, for people's enjoyment, here's some videos from Spark Summit East on the topic * [Spark and object stores|https://youtu.be/8F2Jqw5_OnI]. * [Robust and Scalable etl over Cloud Storage With Spark|https://spark-summit.org/east-2017/events/robust-and-scalable-etl-over-cloud-storage-with-spark/] > Add spark-hadoop-cloud module to pull in object store support > - > > Key: SPARK-7481 > URL: https://issues.apache.org/jira/browse/SPARK-7481 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.1.0 >Reporter: Steve Loughran > > To keep the s3n classpath right, to add s3a, swift & azure, the dependencies > of spark in a 2.6+ profile need to add the relevant object store packages > (hadoop-aws, hadoop-openstack, hadoop-azure) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7481) Add spark-hadoop-cloud module to pull in object store support
[ https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985040#comment-15985040 ] Steve Loughran commented on SPARK-7481: --- (This is a fairly long comment, but it tries to summarise the entire state of interaction with object stores, esp. S3A on Hadoop 2.8+. Azure is simpler, GCS: google's problem. Swift. not used very much). If you look at object store & Spark (or indeed, any code which uses a filesystem as the source and dest of work), there are problems which can generally be grouped into various categories. h3. Foundational: talking to the object stores classpath & execution: can you wire the JARs up? Longstanding issue in ASF Spark releases (SPARK-5348, SPARK-12557). This was exacerbated by the movement of S3n:// to the hadoop-aws-package (FWIW, I hadn't noticed that move, I'd have blocked it if I'd been paying attention). This includes transitive problems (SPARK-11413) Credential propagation. Spark's env var propagation is pretty cute here; SPARK-19739 picks up {{AWS_SESSION_TOKEN}} too. Diagnostics on failure is a real pain. h3. Observable Inconsistencies leading to Data loss Generally where the metaphor "it's just a filesystem" fail. These are bad because they often "just work", especially in dev & Test with small datasets, and when they go wrong, they can fail by generating bad results *and nobody notices*. * Expectations of consistent listing of "directories" S3Guard deals with this, HADOOP-13345, as can Netflix's S3mper and AWS's premium Dynamo backed S3 storage. * Expectations on the transacted nature of Directory renames, the core atomic commit operations against full filesystems. * Expectations that when things are deleted they go away. This does become visible sometimes, usually in checks for a destination not existing (SPARK-19013) * Expectations that write-in-progress data is visible/flushed, that {{close()}} is low cost. SPARK-19111. Committing pretty much combines all of these, see below for more details. h3. Aggressively bad performance That's the mismatch between what the object store offers, what the apps expect, and the metaphor work in the Hadooop FileSystem implementations, which, in trying to hide the conceptual mismatch can actually amplify the problem. Example: Directory tree scanning at the start of a query. The mock directory structure allows callers to do treewalks, when really a full list of all children can be done as a direct O(1) call. SPARK-17159 covers some of this for scanning directories in Spark Streaming, but there's a hidden tree walk in every call to {{FileSystem.globStatus()}} (HADOOP-13371). Given how S3Guard transforms this treewalk, and you need it for consistency, that's probably the best solution for now. Although I have a PoC which does a full List **/* followed by a filter, that's not viable when you have a wide deep tree and do need to prune aggressively. Checkpointing to object stores is similar: it's generally not dangerous to do the write+rename, just adds the copy overhead, consistency issues notwithstanding. h3. Suboptimal code. There's opportunities for speedup, but if it's not on the critical path, not worth the hassle. That said, as every call to {{getFileStatus()}} can take hundreds of millis, they get onto the critical path quite fast. Example checks for a file existing before calling {{fs.delete(path)}} (this is always a no-op if the dest path isn't there), and the equivalent on mkdirs: {{if (!fs.exists(dir) fs.mkdirs(path)}}. Hadoop 3.0 will help steer people on the path of righteousness there by deprecating a couple of methods which encourage inefficiencies (isFile/isDir). h3. The commit problem The full commit problem combines all of these: you need a consistent list of source data, your deleted destination path musn't appear in listings, the commit of each task must promote a task's work to the pending output of the job; an abort must leave no trace of it. The final job commit must place data into the final destination, again, job abort not make any output visible. There's some ambiguity about what happens if task and job commits fails; generally the safest is "abort everything". Futhermore nobody has any idea what to do if an {{abort()}} raises exceptions. Oh, and all of this must be fast. Spark is no better or worse than the core MapReduce committers here, or that of Hive. Spark generally uses the Hadoop {{FileOutputFormat}} via the {{HadoopMapReduceCommitProtocol}}, directly or indirectly (e.g {{ParquetOutputFormat}}), extracting its committer and casting it to {{FileOutputCommitter}}, primarily to get a working directory. This committer assumes the destination is a consistent FS, uses renames when promoting task and job output, assuming that is so fast it doesn't even bother to log a message "about to rename". Hence the recurrent Stack Overflow
[jira] [Commented] (SPARK-17159) Improve FileInputDStream.findNewFiles list performance
[ https://issues.apache.org/jira/browse/SPARK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981081#comment-15981081 ] Steve Loughran commented on SPARK-17159: pulled out documentation into separate JIRA, SPARK-20448, for independent review > Improve FileInputDStream.findNewFiles list performance > -- > > Key: SPARK-17159 > URL: https://issues.apache.org/jira/browse/SPARK-17159 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0 > Environment: spark against object stores >Reporter: Steve Loughran >Priority: Minor > > {{FileInputDStream.findNewFiles()}} is doing a globStatus with a fitler that > calls getFileStatus() on every file, takes the output and does listStatus() > on the output. > This going to suffer on object stores, as dir listing and getFileStatus calls > are so expensive. It's clear this is a problem, as the method has code to > detect timeouts in the window and warn of problems. > It should be possible to make this faster -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20448) Document how FileInputDStream works with object storage
Steve Loughran created SPARK-20448: -- Summary: Document how FileInputDStream works with object storage Key: SPARK-20448 URL: https://issues.apache.org/jira/browse/SPARK-20448 Project: Spark Issue Type: Documentation Components: Documentation Affects Versions: 2.1.0 Reporter: Steve Loughran Priority: Minor Object stores work differently from filesystems: intermediate writes not visible, renames are really O(data) copies, not O(1) transactions. This makes working with them as DStreams fundamentally different: you can write straight into the destination. 1. Document how FileinputDStreams scan directories for changes 2. Document how object stores behave differently, and the implications for users. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21374) Reading globbed paths from S3 into DF doesn't work if filesystem caching is disabled
[ https://issues.apache.org/jira/browse/SPARK-21374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108897#comment-16108897 ] Steve Loughran commented on SPARK-21374: I understand...the patch shows the issue. Its only working in some codepaths because the (authenticated) S3 FS instance was already created and cached. > Reading globbed paths from S3 into DF doesn't work if filesystem caching is > disabled > > > Key: SPARK-21374 > URL: https://issues.apache.org/jira/browse/SPARK-21374 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1 >Reporter: Andrey Taptunov > > *Motivation:* > In my case I want to disable filesystem cache to be able to change S3's > access key and secret key on the fly to read from buckets with different > permissions. This works perfectly fine for RDDs but doesn't work for DFs. > *Example (works for RDD but fails for DataFrame):* > {code:java} > import org.apache.spark.SparkContext > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > object SimpleApp { > def main(args: Array[String]) { > val awsAccessKeyId = "something" > val awsSecretKey = "something else" > val conf = new SparkConf().setAppName("Simple > Application").setMaster("local[*]") > val sc = new SparkContext(conf) > sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", awsAccessKeyId) > sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", awsSecretKey) > sc.hadoopConfiguration.setBoolean("fs.s3.impl.disable.cache",true) > > sc.hadoopConfiguration.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") > sc.hadoopConfiguration.set("fs.s3.buffer.dir","/tmp") > val spark = SparkSession.builder().config(conf).getOrCreate() > val rddFile = sc.textFile("s3://bucket/file.csv").count // ok > val rddGlob = sc.textFile("s3://bucket/*").count // ok > val dfFile = spark.read.format("csv").load("s3://bucket/file.csv").count > // ok > > val dfGlob = spark.read.format("csv").load("s3://bucket/*").count > // IllegalArgumentExcepton. AWS Access Key ID and Secret Access Key must > be specified as the username or password (respectively) > // of a s3 URL, or by setting the fs.s3.awsAccessKeyId or > fs.s3.awsSecretAccessKey properties (respectively). > > sc.stop() > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21514) Hive has updated with new support for S3 and InsertIntoHiveTable.scala should update also
[ https://issues.apache.org/jira/browse/SPARK-21514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108882#comment-16108882 ] Steve Loughran commented on SPARK-21514: Can you link this JIRA to the specific HIVE work? > Hive has updated with new support for S3 and InsertIntoHiveTable.scala should > update also > - > > Key: SPARK-21514 > URL: https://issues.apache.org/jira/browse/SPARK-21514 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Javier Ros > > Hive has updated adding new parameters to optimize the usage of S3, now you > can avoid the usage of S3 as the stagingdir using the parameters > hive.blobstore.supported.schemes & hive.blobstore.optimizations.enabled. > The InsertIntoHiveTable.scala file should be updated with the same > improvement to match the behavior of Hive. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21618) http(s) not accepted in spark-submit jar uri
[ https://issues.apache.org/jira/browse/SPARK-21618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114379#comment-16114379 ] Steve Loughran commented on SPARK-21618: yes, and that 2.9+ feature breaks things, because when you ask for an http or https connection, you get back some Hadoop wrapper class, which is not what other code (e.g. Wasb) wants ... their attempts to cast it to the normal java,io base fails. This doesn't surface on any shipping Hadoop release (or HDP/CDH/EMR) & a fix is in progress. > http(s) not accepted in spark-submit jar uri > > > Key: SPARK-21618 > URL: https://issues.apache.org/jira/browse/SPARK-21618 > Project: Spark > Issue Type: Bug > Components: Deploy >Affects Versions: 2.1.1, 2.2.0 > Environment: pre-built for hadoop 2.6 and 2.7 on mac and ubuntu > 16.04. >Reporter: Ben Mayne >Priority: Minor > Labels: documentation > > The documentation suggests I should be able to use an http(s) uri for a jar > in spark-submit, but I haven't been successful > https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management > {noformat} > benmayne@Benjamins-MacBook-Pro ~ $ spark-submit --deploy-mode client --master > local[2] --class class.name.Test https://test.com/path/to/jar.jar > log4j:WARN No appenders could be found for logger > (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > Exception in thread "main" java.io.IOException: No FileSystem for scheme: > https > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) > at > org.apache.spark.deploy.SparkSubmit$.downloadFile(SparkSubmit.scala:865) > at > org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$1.apply(SparkSubmit.scala:316) > at > org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$1.apply(SparkSubmit.scala:316) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:316) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > benmayne@Benjamins-MacBook-Pro ~ $ > {noformat} > If I replace the path with a valid hdfs path > (hdfs:///user/benmayne/valid-jar.jar), it works as expected. I've seen the > same behavior across 2.2.0 (hadoop 2.6 & 2.7 on mac and ubuntu) and on 2.1.1 > on ubuntu. > this is the example that I'm trying to replicate from > https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management: > > > Spark uses the following URL scheme to allow different strategies for > > disseminating jars: > > file: - Absolute paths and file:/ URIs are served by the driver’s HTTP file > > server, and every executor pulls the file from the driver HTTP server. > > hdfs:, http:, https:, ftp: - these pull down files and JARs from the URI as > > expected > {noformat} > # Run on a Mesos cluster in cluster deploy mode with supervise > ./bin/spark-submit \ > --class org.apache.spark.examples.SparkPi \ > --master mesos://207.184.161.138:7077 \ > --deploy-mode cluster \ > --supervise \ > --executor-memory 20G \ > --total-executor-cores 100 \ > http://path/to/examples.jar \ > 1000 > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21762) FileFormatWriter metrics collection fails if a newly close()d file isn't yet visible
Steve Loughran created SPARK-21762: -- Summary: FileFormatWriter metrics collection fails if a newly close()d file isn't yet visible Key: SPARK-21762 URL: https://issues.apache.org/jira/browse/SPARK-21762 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Environment: object stores without complete creation consistency (this includes AWS S3's caching of negative GET results) Reporter: Steve Loughran Priority: Minor The metrics collection of SPARK-20703 can trigger premature failure if the newly written object isn't actually visible yet, that is if, after {{writer.close()}}, a {{getFileStatus(path)}} returns a {{FileNotFoundException}}. Strictly speaking, not having a file immediately visible goes against the fundamental expectations of the Hadoop FS APIs, namely full consistent data & medata across all operations, with immediate global visibility of all changes. However, not all object stores make that guarantee, be it only newly created data or updated blobs. And so spurious FNFEs can get raised, ones which *should* have gone away by the time the actual task is committed. Or if they haven't, the job is in such deep trouble. What to do? # leave as is: fail fast & so catch blobstores/blobstore clients which don't behave as required. One issue here: will that trigger retries, what happens there, etc, etc. # Swallow the FNFE and hope the file is observable later. # Swallow all IOEs and hope that whatever problem the FS has is transient. Options 2 & 3 aren't going to collect metrics in the event of a FNFE, or at least, not the counter of bytes written. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20370) create external table on read only location fails
[ https://issues.apache.org/jira/browse/SPARK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993405#comment-15993405 ] Steve Loughran commented on SPARK-20370: Is this the bit under the PR tagged "!! HACK ALERT !!" by any chance? If so, it seems to have gone in for a Hive metastore workaround. I wonder if there is/can be a solution in Hive-land. > create external table on read only location fails > - > > Key: SPARK-20370 > URL: https://issues.apache.org/jira/browse/SPARK-20370 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.1.0 > Environment: EMR 5.4.0, hadoop 2.7.3, spark 2.1.0 >Reporter: Gaurav Shah >Priority: Minor > > Create External table via following fails: > sqlContext.createExternalTable( > "table_name", > "org.apache.spark.sql.parquet", > inputSchema, > Map( >"path" -> "s3a://bucket-name/folder", >"mergeSchema" -> "false" > ) >) > Spark in the following commit tries to check if it has write access to giving > location, which fails and so the table meta creation fails. > https://github.com/apache/spark/pull/13270/files > The table creation script works even if cluster has read only access in spark > 1.6, but fails in spark 2.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20560) Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations
[ https://issues.apache.org/jira/browse/SPARK-20560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993323#comment-15993323 ] Steve Loughran commented on SPARK-20560: To follow this up, I've now got a test which verifies that (a) s3a returns "localhost" and (b) spark discards it. This'll catch any regressions in the s3a client. {code} val source = CSV_TESTFILE.get val fs = getFilesystem(source) val blockLocations = fs.getFileBlockLocations(source, 0, 1) assert(1 === blockLocations.length, s"block location array size wrong: ${blockLocations}") val hosts = blockLocations(0).getHosts assert(1 === hosts.length, s"wrong host size ${hosts}") assert("localhost" === hosts(0), "hostname") val path = source.toString val rdd = sc.hadoopFile[LongWritable, Text, TextInputFormat](path, 1) val input = rdd.asInstanceOf[HadoopRDD[_, _]] val partitions = input.getPartitions val locations = input.getPreferredLocations(partitions.head) assert(locations.isEmpty, s"Location list not empty ${locations}") {code} > Review Spark's handling of filesystems returning "localhost" in > getFileBlockLocations > - > > Key: SPARK-20560 > URL: https://issues.apache.org/jira/browse/SPARK-20560 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Priority: Minor > > Some filesystems (S3a, Azure WASB) return "localhost" as the response to > {{FileSystem.getFileBlockLocations(path)}}. If this is then used as the > preferred host when scheduling work, there's a risk that work will be queued > on one host, rather than spread across the cluster. > HIVE-14060 and TEZ-3291 have both seen it in their schedulers. > I don't know if Spark does it, someone needs to look at the code, maybe write > some tests -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20608) Standby namenodes should be allowed to included in yarn.spark.access.namenodes to support HDFS HA
[ https://issues.apache.org/jira/browse/SPARK-20608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006454#comment-16006454 ] Steve Loughran commented on SPARK-20608: One thing to consider here is starting with a test to see what happens, https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java shows to to set up MiniDFSCluster to run HA But yes, Marcelo is right: you don't go hdfs://hostname1:8000/ you do {{hdfs://hacluster:8000/}} and rely on the list of hosts in {{dfs.ha.namenodes.hacluster}} to tell HDFS who to try to talk to. I suspect it's important to have a cluster name which is not a resolvable hostname, as that may confuse the client (just a guess; haven't looked at the source to see) > Standby namenodes should be allowed to included in > yarn.spark.access.namenodes to support HDFS HA > - > > Key: SPARK-20608 > URL: https://issues.apache.org/jira/browse/SPARK-20608 > Project: Spark > Issue Type: Improvement > Components: Spark Submit, YARN >Affects Versions: 2.0.1, 2.1.0 >Reporter: Yuechen Chen >Priority: Minor > Original Estimate: 672h > Remaining Estimate: 672h > > If one Spark Application need to access remote namenodes, > yarn.spark.access.namenodes should be only be configged in spark-submit > scripts, and Spark Client(On Yarn) would fetch HDFS credential periodically. > If one hadoop cluster is configured by HA, there would be one active namenode > and at least one standby namenode. > However, if yarn.spark.access.namenodes includes both active and standby > namenodes, Spark Application will be failed for the reason that the standby > namenode would not access by Spark for org.apache.hadoop.ipc.StandbyException. > I think it won't cause any bad effect to config standby namenodes in > yarn.spark.access.namenodes, and my Spark Application can be able to sustain > the failover of Hadoop namenode. > HA Examples: > Spark-submit script: > yarn.spark.access.namenodes=hdfs://namenode01,hdfs://namenode02 > Spark Application Codes: > dataframe.write.parquet(getActiveNameNode(...) + hdfsPath) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21074) Parquet files are read fully even though only count() is requested
[ https://issues.apache.org/jira/browse/SPARK-21074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056309#comment-16056309 ] Steve Loughran commented on SPARK-21074: Given this is an s3 URL, it may be amplifying the problem, with issues such as HADOOP-11570 and HADOOP-12376; Http 1.1 clients optimising prematurely for connection re-use by reading all the way to the end of the stream. an open(0) and a seek(end - a bit) would cause them to read all the way through the file at first, then re-issue a new GET. Assuming this is the amazon EMR s3 client, I can't say if it has the same issue. # Test with local files to see if the problem goes away. If it doesn't, problem in parquet # move to Hadoop 2.7+ Jars and s3a:// URLs. Better yet, drop in the 2.8 binaries & matching AWS SDK to get the high-performance-seek code, including metrics you get to see when you call toString() on the FS instance. > Parquet files are read fully even though only count() is requested > -- > > Key: SPARK-21074 > URL: https://issues.apache.org/jira/browse/SPARK-21074 > Project: Spark > Issue Type: Improvement > Components: Optimizer >Affects Versions: 2.1.0 >Reporter: Michael Spector > > I have the following sample code that creates parquet files: > {code:java} > val spark = SparkSession.builder() > .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", > "2") > .config("spark.hadoop.parquet.metadata.read.parallelism", "50") > .appName("Test Write").getOrCreate() > val sqc = spark.sqlContext > import sqc.implicits._ > val random = new scala.util.Random(31L) > (1465720077 to 1465720077+1000).map(x => Event(x, random.nextString(2))) > .toDS() > .write > .mode(SaveMode.Overwrite) > .parquet("s3://my-bucket/test") > {code} > Afterwards, I'm trying to read these files with the following code: > {code:java} > val spark = SparkSession.builder() > .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", > "2") > .config("spark.hadoop.parquet.metadata.read.parallelism", "50") > .config("spark.sql.parquet.filterPushdown", "true") > .appName("Test Read").getOrCreate() > spark.sqlContext.read > .option("mergeSchema", "false") > .parquet("s3://my-bucket/test") > .count() > {code} > I've enabled DEBUG log level to see what requests are actually sent through > S3 API, and I've figured out that in addition to parquet "footer" retrieval > there are requests that ask for whole file content. > For example, this is full content request example: > {noformat} > 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET > /test/part-0-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet > HTTP/1.1[\r][\n]" > > 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes > 0-7472093/7472094[\r][\n]" > > 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: > 7472094[\r][\n]" > {noformat} > And this is partial request example for footer only: > {noformat} > 17/06/13 05:46:50 DEBUG headers: http-outgoing-2 >> GET > /test/part-0-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1 > > 17/06/13 05:46:50 DEBUG headers: http-outgoing-2 >> Range: > bytes=7472086-7472094 > ... > 17/06/13 05:46:50 DEBUG wire: http-outgoing-2 << "Content-Length: 8[\r][\n]" > > {noformat} > Here's what FileScanRDD prints: > {noformat} > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-4-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7473020, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00011-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472503, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-6-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472501, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-7-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7473104, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-3-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472458, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-00012-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472594, partition values: [empty row] > 17/06/13 05:46:52 INFO FileScanRDD: Reading File path: > s3://my-bucket/test/part-1-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet, > range: 0-7472984, partition values: [empty row] > 17/06/13 05:46:52 INFO
[jira] [Commented] (SPARK-19111) S3 Mesos history upload fails silently if too large
[ https://issues.apache.org/jira/browse/SPARK-19111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16056516#comment-16056516 ] Steve Loughran commented on SPARK-19111: Followup: [~drcrallen]; Hadoop 2.8 is out the door with the JARs you need for the S3a block output. Curious why it's blocking the history server though. SPARK-11373 adds metrics to the SHS, which may help > S3 Mesos history upload fails silently if too large > --- > > Key: SPARK-19111 > URL: https://issues.apache.org/jira/browse/SPARK-19111 > Project: Spark > Issue Type: Bug > Components: EC2, Mesos, Spark Core >Affects Versions: 2.0.0 >Reporter: Charles Allen > > {code} > 2017-01-06T21:32:32,928 INFO [main] org.apache.spark.ui.SparkUI - Stopped > Spark web UI at http://REDACTED:4041 > 2017-01-06T21:32:32,938 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.jvmGCTime > 2017-01-06T21:32:32,939 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.shuffle.read.localBlocksFetched > 2017-01-06T21:32:32,939 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.resultSerializationTime > 2017-01-06T21:32:32,939 ERROR [heartbeat-receiver-event-loop-thread] > org.apache.spark.scheduler.LiveListenerBus - SparkListenerBus has already > stopped! Dropping event SparkListenerExecutorMetricsUpdate( > 364,WrappedArray()) > 2017-01-06T21:32:32,939 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.resultSize > 2017-01-06T21:32:32,939 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.peakExecutionMemory > 2017-01-06T21:32:32,939 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.shuffle.read.fetchWaitTime > 2017-01-06T21:32:32,939 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.memoryBytesSpilled > 2017-01-06T21:32:32,940 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.shuffle.read.remoteBytesRead > 2017-01-06T21:32:32,940 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.diskBytesSpilled > 2017-01-06T21:32:32,940 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.shuffle.read.localBytesRead > 2017-01-06T21:32:32,940 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.shuffle.read.recordsRead > 2017-01-06T21:32:32,940 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.executorDeserializeTime > 2017-01-06T21:32:32,940 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: output/bytes > 2017-01-06T21:32:32,941 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.executorRunTime > 2017-01-06T21:32:32,941 INFO [SparkListenerBus] > com.metamx.starfire.spark.SparkDriver - emitting metric: > internal.metrics.shuffle.read.remoteBlocksFetched > 2017-01-06T21:32:32,943 INFO [main] > org.apache.hadoop.fs.s3native.NativeS3FileSystem - OutputStream for key > 'eventLogs/remnant/46bf8f87-6de6-4da8-9cba-5b2fecd0875e-1387.inprogress' > closed. Now beginning upload > 2017-01-06T21:32:32,963 ERROR [heartbeat-receiver-event-loop-thread] > org.apache.spark.scheduler.LiveListenerBus - SparkListenerBus has already > stopped! Dropping event SparkListenerExecutorMetricsUpdate(905,WrappedArray()) > 2017-01-06T21:32:32,973 ERROR [heartbeat-receiver-event-loop-thread] > org.apache.spark.scheduler.LiveListenerBus - SparkListenerBus has already > stopped! Dropping event SparkListenerExecutorMetricsUpdate(519,WrappedArray()) > 2017-01-06T21:32:32,988 ERROR [heartbeat-receiver-event-loop-thread] > org.apache.spark.scheduler.LiveListenerBus - SparkListenerBus has already > stopped! Dropping event SparkListenerExecutorMetricsUpdate(596,WrappedArray()) > {code} > Running spark on mesos, some large jobs fail to upload to the history server > storage! > A successful sequence of events in the log that yield an upload are as > follows: > {code} > 2017-01-06T19:14:32,925 INFO [main] > org.apache.hadoop.fs.s3native.NativeS3FileSystem - OutputStream for key > 'eventLogs/remnant/46bf8f87-6de6-4da8-9cba-5b2fecd0875e-1434.inprogress' > writing to tempfile '/mnt/tmp/hadoop/output-2516573909248961808.tmp' > 2017-01-06T21:59:14,789 INFO [main] > org.apache.hadoop.fs.s3native.NativeS3FileSystem - OutputStream for key >
[jira] [Commented] (SPARK-11373) Add metrics to the History Server and providers
[ https://issues.apache.org/jira/browse/SPARK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16059141#comment-16059141 ] Steve Loughran commented on SPARK-11373: metrics might help with understanding the s3 load issues in SPARK-19111 > Add metrics to the History Server and providers > --- > > Key: SPARK-11373 > URL: https://issues.apache.org/jira/browse/SPARK-11373 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Steve Loughran > > The History server doesn't publish metrics about JVM load or anything from > the history provider plugins. This means that performance problems from > massive job histories aren't visible to management tools, and nor are any > provider-generated metrics such as time to load histories, failed history > loads, the number of connectivity failures talking to remote services, etc. > If the history server set up a metrics registry and offered the option to > publish its metrics, then management tools could view this data. > # the metrics registry would need to be passed down to the instantiated > {{ApplicationHistoryProvider}}, in order for it to register its metrics. > # if the codahale metrics servlet were registered under a path such as > {{/metrics}}, the values would be visible as HTML and JSON, without the need > for management tools. > # Integration tests could also retrieve the JSON-formatted data and use it as > part of the test suites. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20799) Unable to infer schema for ORC on reading ORC from S3
[ https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017258#comment-16017258 ] Steve Loughran commented on SPARK-20799: bq. Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login details. This is insecure and may be unsupported in future., but this should not mean that it shouldn't work anymore. It probably will stop working at some point in the future as putting secrets in the URIs is too dangerous: everything logs them assuming they aren't sensitive data. the {{S3xLoginHelper}} not only warns you, it does a best-effort attempt to strip out the secrets from the public URI, hence the logs and the messages telling you off. Prior to Hadoop 2.8, the sole *defensible* use case of secrets in URIs was it was the only way to have different logins on different buckets. In Hadoop 2.8 we added the ability to configure any of the fs.s3a. options on a per-bucket basis, including the secret logins, endpoints, and other important values I see what may be happening; in which case it probably constitutes a hadoop regression: if the filesystem's URI is converted to a string it will have these stripped, so if something is going path -> URI -> String ->path the secrets will be lost. If you are seeing this stack trace, it means you are using Hadoop 2.8 or something else with the HADOOP-3733 patch in it. What version of Hadoop (or HDP, CDH..) are you using? If it is based on the full Apache 2.8 release, you get # per-bucket config to allow you to [configure each bucket separately|http://hadoop.apache.org/docs/r2.8.0/hadoop-aws/tools/hadoop-aws/index.html#Configurations_different_S3_buckets] # the ability to use JCEKS files to keep the secrets out the configs # session token support. Accordingly, if you state the version, I may be able to look @ what's happening in a bit more detail > Unable to infer schema for ORC on reading ORC from S3 > - > > Key: SPARK-20799 > URL: https://issues.apache.org/jira/browse/SPARK-20799 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jork Zijlstra > > We are getting the following exception: > {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. > It must be specified manually.{code} > Combining following factors will cause it: > - Use S3 > - Use format ORC > - Don't apply a partitioning on de data > - Embed AWS credentials in the path > The problem is in the PartitioningAwareFileIndex def allFiles() > {code} > leafDirToChildrenFiles.get(qualifiedPath) > .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } > .getOrElse(Array.empty) > {code} > leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the > qualifiedPath contains the path WITH credentials. > So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no > data is read and the schema cannot be defined. > Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login > details. This is insecure and may be unsupported in future., but this should > not mean that it shouldn't work anymore. > Workaround: > Move the AWS credentials from the path to the SparkSession > {code} > SparkSession.builder > .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId}) > .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey}) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20799) Unable to infer schema for ORC on reading ORC from S3
[ https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022807#comment-16022807 ] Steve Loughran commented on SPARK-20799: If what I think is happening is, then it's the security tightening of HADOOP-3733 which has stopped this. It is sort-of-a-regression, but as it has a security benefit "stops leaking your secrets through logs" Its not something we want to revert. Anyway, it *never* worked if you had a "/" in your secret key, so the sole reason it worked for you in the past is that you don't (see: I know something about your secret credentials:) Hadoop 2.8 is way better for S3A support all round, so I'd encourage you to stay and play. In particular, # switch from s3n:// to s3a:// for your URLs, to get the new high-performance client # try setting {{fs.s3a.experimental.fadvise=random}} in your settings and you should expect to see a significant speedup in ORC input. If the use case here is that you want to use separate credentials for a specific bucket, you can use per-bucket config now {code} fs.s3a.bucket.site-2.access.key=my access key fs.s3a.bucket.site-2.access.secret=my access secret {code} then when you refer to {{s3a://site-2/path}} , the specific key & secret for that bucket are picked up. This is why you shouldn't need to use inline secrets at all > Unable to infer schema for ORC on reading ORC from S3 > - > > Key: SPARK-20799 > URL: https://issues.apache.org/jira/browse/SPARK-20799 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Jork Zijlstra > > We are getting the following exception: > {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. > It must be specified manually.{code} > Combining following factors will cause it: > - Use S3 > - Use format ORC > - Don't apply a partitioning on de data > - Embed AWS credentials in the path > The problem is in the PartitioningAwareFileIndex def allFiles() > {code} > leafDirToChildrenFiles.get(qualifiedPath) > .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } > .getOrElse(Array.empty) > {code} > leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the > qualifiedPath contains the path WITH credentials. > So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no > data is read and the schema cannot be defined. > Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login > details. This is insecure and may be unsupported in future., but this should > not mean that it shouldn't work anymore. > Workaround: > Move the AWS credentials from the path to the SparkSession > {code} > SparkSession.builder > .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId}) > .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey}) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20799) Unable to infer schema for ORC on S3N when secrets are in the URL
[ https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-20799: --- Summary: Unable to infer schema for ORC on S3N when secrets are in the URL (was: Unable to infer schema for ORC on reading ORC from S3) > Unable to infer schema for ORC on S3N when secrets are in the URL > - > > Key: SPARK-20799 > URL: https://issues.apache.org/jira/browse/SPARK-20799 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Jork Zijlstra > > We are getting the following exception: > {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. > It must be specified manually.{code} > Combining following factors will cause it: > - Use S3 > - Use format ORC > - Don't apply a partitioning on de data > - Embed AWS credentials in the path > The problem is in the PartitioningAwareFileIndex def allFiles() > {code} > leafDirToChildrenFiles.get(qualifiedPath) > .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } > .getOrElse(Array.empty) > {code} > leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the > qualifiedPath contains the path WITH credentials. > So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no > data is read and the schema cannot be defined. > Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login > details. This is insecure and may be unsupported in future., but this should > not mean that it shouldn't work anymore. > Workaround: > Move the AWS credentials from the path to the SparkSession > {code} > SparkSession.builder > .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId}) > .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey}) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19669) Open up visibility for sharedState, sessionState, and a few other functions
[ https://issues.apache.org/jira/browse/SPARK-19669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16026219#comment-16026219 ] Steve Loughran commented on SPARK-19669: thanks for this, very nice to have Logging usable outside Spark's own codebase again; I had my own copy & paste but when you start playing with traits across things, life gets hard > Open up visibility for sharedState, sessionState, and a few other functions > --- > > Key: SPARK-19669 > URL: https://issues.apache.org/jira/browse/SPARK-19669 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.1.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > Fix For: 2.2.0 > > > To ease debugging, most of Spark SQL internals have public level visibility. > Two of the most important internal states, sharedState and sessionState, > however, are package private. It would make more sense to open these up as > well with clear documentation that they are internal. > In addition, users currently have way to set active/default SparkSession, but > no way to actually get them back. We should open those up as well. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8578) Should ignore user defined output committer when appending data
[ https://issues.apache.org/jira/browse/SPARK-8578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024526#comment-16024526 ] Steve Loughran commented on SPARK-8578: --- Given SPARK-10063 has pulled the {{DirectParquetOutputCommitter}} on account of the incompatibility with 'direct' output and 'commit protocol', it may be time to revisit this. > Should ignore user defined output committer when appending data > --- > > Key: SPARK-8578 > URL: https://issues.apache.org/jira/browse/SPARK-8578 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 >Reporter: Cheng Lian >Assignee: Yin Huai > Fix For: 1.4.1, 1.5.0 > > > When appending data to a file system via Hadoop API, it's safer to ignore > user defined output committer classes like {{DirectParquetOutputCommitter}}. > Because it's relatively hard to handle task failure in this case. For > example, {{DirectParquetOutputCommitter}} directly writes to the output > directory to boost write performance when working with S3. However, there's > no general way to determine task output file path of a specific task in > Hadoop API, thus we don't know to revert a failed append job. (When doing > overwrite, we can just remove the whole output directory.) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null
[ https://issues.apache.org/jira/browse/SPARK-20886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024886#comment-16024886 ] Steve Loughran commented on SPARK-20886: Stack trace: after {code} 2017-05-25 16:22:10,807 [dag-scheduler-event-loop] INFO scheduler.DAGScheduler (Logging.scala:logInfo(54)) - ResultStage 2 (apply at Transformer.scala:22) failed in 0.065 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:263) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:182) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:181) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: requirement failed: Committer has no workpath FileOutputCommitter{outputPath=file:/Users/stevel/Projects/sparkwork/cloud-integration/cloud-examples/target/tmp/spark-41b05e5f-93eb-4b1b-8e9d-7fd930641267, workPath=null, algorithmVersion=2, skipCleanup=false, ignoreCleanupFailures=false} at scala.Predef$.require(Predef.scala:224) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.newTaskTempFile(HadoopMapReduceCommitProtocol.scala:78) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:291) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:305) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:249) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1365) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:252) ... 8 more {code} > HadoopMapReduceCommitProtocol to fail with message if > FileOutputCommitter.getWorkPath==null > --- > > Key: SPARK-20886 > URL: https://issues.apache.org/jira/browse/SPARK-20886 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Steve Loughran >Priority: Trivial > > This is minor, and the root cause is my fault *elsewhere*, but its the patch > I used to track down the problem. > If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for > committing things, and *somehow* that's been configured with a > {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs. > A {{require()}} statement can validate the working path and so point the > blame at whoever's code is confused. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null
[ https://issues.apache.org/jira/browse/SPARK-20886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024885#comment-16024885 ] Steve Loughran commented on SPARK-20886: Stack trace: before {code} Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) ... Cause: org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:263) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:182) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:181) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ... Cause: java.lang.NullPointerException: at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.newTaskTempFile(HadoopMapReduceCommitProtocol.scala:76) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:291) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:305) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:249) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1365) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:252) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:182) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:181) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) {code} > HadoopMapReduceCommitProtocol to fail with message if > FileOutputCommitter.getWorkPath==null > --- > > Key: SPARK-20886 > URL: https://issues.apache.org/jira/browse/SPARK-20886 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Steve Loughran >Priority: Trivial > > This is minor, and the root cause is my fault *elsewhere*, but its the patch > I used to track down the problem. > If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for > committing things, and *somehow* that's been configured with a > {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs. > A {{require()}} statement can validate the working path and so point the > blame at whoever's code is confused. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null
Steve Loughran created SPARK-20886: -- Summary: HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null Key: SPARK-20886 URL: https://issues.apache.org/jira/browse/SPARK-20886 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.3.0 Reporter: Steve Loughran Priority: Trivial This is minor, and the root cause is my fault *elsewhere*, but its the patch I used to track down the problem. If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for committing things, and *somehow* that's been configured with a {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs. A {{require()}} statement can validate the working path and so point the blame at whoever's code is confused. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20799) Unable to infer schema for ORC on S3N when secrets are in the URL
[ https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-20799: --- Priority: Minor (was: Major) > Unable to infer schema for ORC on S3N when secrets are in the URL > - > > Key: SPARK-20799 > URL: https://issues.apache.org/jira/browse/SPARK-20799 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Jork Zijlstra >Priority: Minor > > We are getting the following exception: > {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. > It must be specified manually.{code} > Combining following factors will cause it: > - Use S3 > - Use format ORC > - Don't apply a partitioning on de data > - Embed AWS credentials in the path > The problem is in the PartitioningAwareFileIndex def allFiles() > {code} > leafDirToChildrenFiles.get(qualifiedPath) > .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } > .getOrElse(Array.empty) > {code} > leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the > qualifiedPath contains the path WITH credentials. > So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no > data is read and the schema cannot be defined. > Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login > details. This is insecure and may be unsupported in future., but this should > not mean that it shouldn't work anymore. > Workaround: > Move the AWS credentials from the path to the SparkSession > {code} > SparkSession.builder > .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId}) > .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey}) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20799) Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL
[ https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-20799: --- Summary: Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL (was: Unable to infer schema for ORC on S3N when secrets are in the URL) > Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL > - > > Key: SPARK-20799 > URL: https://issues.apache.org/jira/browse/SPARK-20799 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Jork Zijlstra >Priority: Minor > > We are getting the following exception: > {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. > It must be specified manually.{code} > Combining following factors will cause it: > - Use S3 > - Use format ORC > - Don't apply a partitioning on de data > - Embed AWS credentials in the path > The problem is in the PartitioningAwareFileIndex def allFiles() > {code} > leafDirToChildrenFiles.get(qualifiedPath) > .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } > .getOrElse(Array.empty) > {code} > leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the > qualifiedPath contains the path WITH credentials. > So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no > data is read and the schema cannot be defined. > Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login > details. This is insecure and may be unsupported in future., but this should > not mean that it shouldn't work anymore. > Workaround: > Move the AWS credentials from the path to the SparkSession > {code} > SparkSession.builder > .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId}) > .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey}) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20799) Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL
[ https://issues.apache.org/jira/browse/SPARK-20799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-20799: --- Environment: Hadoop 2.8.0 binaries > Unable to infer schema for ORC/Parquet on S3N when secrets are in the URL > - > > Key: SPARK-20799 > URL: https://issues.apache.org/jira/browse/SPARK-20799 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 > Environment: Hadoop 2.8.0 binaries >Reporter: Jork Zijlstra >Priority: Minor > > We are getting the following exception: > {code}org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. > It must be specified manually.{code} > Combining following factors will cause it: > - Use S3 > - Use format ORC > - Don't apply a partitioning on de data > - Embed AWS credentials in the path > The problem is in the PartitioningAwareFileIndex def allFiles() > {code} > leafDirToChildrenFiles.get(qualifiedPath) > .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } > .getOrElse(Array.empty) > {code} > leafDirToChildrenFiles uses the path WITHOUT credentials as its key while the > qualifiedPath contains the path WITH credentials. > So leafDirToChildrenFiles.get(qualifiedPath) doesn't find any files, so no > data is read and the schema cannot be defined. > Spark does output the S3xLoginHelper:90 - The Filesystem URI contains login > details. This is insecure and may be unsupported in future., but this should > not mean that it shouldn't work anymore. > Workaround: > Move the AWS credentials from the path to the SparkSession > {code} > SparkSession.builder > .config("spark.hadoop.fs.s3n.awsAccessKeyId", {awsAccessKeyId}) > .config("spark.hadoop.fs.s3n.awsSecretAccessKey", {awsSecretAccessKey}) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21077) Cannot access public files over S3 protocol
[ https://issues.apache.org/jira/browse/SPARK-21077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051680#comment-16051680 ] Steve Loughran commented on SPARK-21077: like people say, this is inevitably a config problem. Hadoop 2.7.x has the credential provider you need you should be able to read {{s3a://landsat-pds/scene_list.gz}} as a csv file with anon credentials. > Cannot access public files over S3 protocol > --- > > Key: SPARK-21077 > URL: https://issues.apache.org/jira/browse/SPARK-21077 > Project: Spark > Issue Type: Bug > Components: EC2 >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 default installation. No existing hadoop, > using the one distributed with Spark. > Added in $SPARK_HOME/jars: > hadoop-aws-2.7.3.jar and aws-java-sdk-1.7.4.jar > Added endpoint configuration in $SPARK_HOME/conf/core-site.xml (I want to > access datasets hosted by organisation with CEPH; follows S3 protocols). > Ubuntu 14.04 x64. >Reporter: Ciprian Tomoiaga > > I am trying to access a dataset with public (anonymous) credentials via the > S3 (or S3a, s3n) protocol. > It fails with the error that no provider in chain can supply the credentials. > I asked our sysadmin to add some dummy credentials, and if I set them up (via > link or config) then I have access. > I tried setting the config : > {code:xml} > > fs.s3a.credentials.provider > org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider > > {code} > but it still doesn't work. > I suggested that it is a java-aws issue > [here|https://github.com/aws/aws-sdk-java/issues/1122#issuecomment-307814540], > but they said it is not. > Any hints on how to use public S3 files from Spark ? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7481) Add spark-hadoop-cloud module to pull in object store support
[ https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000502#comment-16000502 ] Steve Loughran commented on SPARK-7481: --- thank you! > Add spark-hadoop-cloud module to pull in object store support > - > > Key: SPARK-7481 > URL: https://issues.apache.org/jira/browse/SPARK-7481 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Assignee: Steve Loughran > Fix For: 2.3.0 > > > To keep the s3n classpath right, to add s3a, swift & azure, the dependencies > of spark in a 2.6+ profile need to add the relevant object store packages > (hadoop-aws, hadoop-openstack, hadoop-azure) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20608) Standby namenodes should be allowed to included in yarn.spark.access.namenodes to support HDFS HA
[ https://issues.apache.org/jira/browse/SPARK-20608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998150#comment-15998150 ] Steve Loughran commented on SPARK-20608: Probably good to pull in someone who understands HDFS HA; I nominate [~liuml07]. My main worry is that RemoteException could be a symptom of something more serious than the node being in standby, but I don't know enough about NN HA for my opinions to be trusted. > Standby namenodes should be allowed to included in > yarn.spark.access.namenodes to support HDFS HA > - > > Key: SPARK-20608 > URL: https://issues.apache.org/jira/browse/SPARK-20608 > Project: Spark > Issue Type: Improvement > Components: Spark Submit, YARN >Affects Versions: 2.0.1, 2.1.0 >Reporter: Yuechen Chen >Priority: Minor > Original Estimate: 672h > Remaining Estimate: 672h > > If one Spark Application need to access remote namenodes, > yarn.spark.access.namenodes should be only be configged in spark-submit > scripts, and Spark Client(On Yarn) would fetch HDFS credential periodically. > If one hadoop cluster is configured by HA, there would be one active namenode > and at least one standby namenode. > However, if yarn.spark.access.namenodes includes both active and standby > namenodes, Spark Application will be failed for the reason that the standby > namenode would not access by Spark for org.apache.hadoop.ipc.StandbyException. > I think it won't cause any bad effect to config standby namenodes in > yarn.spark.access.namenodes, and my Spark Application can be able to sustain > the failover of Hadoop namenode. > HA Examples: > Spark-submit script: > yarn.spark.access.namenodes=hdfs://namenode01,hdfs://namenode02 > Spark Application Codes: > dataframe.write.parquet(getActiveNameNode(...) + hdfsPath) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20560) Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations
Steve Loughran created SPARK-20560: -- Summary: Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations Key: SPARK-20560 URL: https://issues.apache.org/jira/browse/SPARK-20560 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.1.0 Reporter: Steve Loughran Priority: Minor Some filesystems (S3a, Azure WASB) return "localhost" as the response to {{FileSystem.getFileBlockLocations(path)}}. If this is then used as the preferred host when scheduling work, there's a risk that work will be queued on one host, rather than spread across the cluster. HIVE-14060 and TEZ-3291 have both seen it in their schedulers. I don't know if Spark does it, someone needs to look at the code, maybe write some tests -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20560) Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations
[ https://issues.apache.org/jira/browse/SPARK-20560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993008#comment-15993008 ] Steve Loughran commented on SPARK-20560: {{FileSystem.getFileBlockLocations(path)}} is only invoked from from {{HdfsUtils.getFileSegmentLocations}}, and used as a source of data for {{RDD.preferredLocations}} I don't see anything explicit through the code that detects & reacts to the FS call returning localhost; I'll do some test downstream to see what surfaces against S3. Unless the scheduler has some explicit "localhost -> anywhere" map, it might make sense for HdfsUtils.getFileSegmentLocation to downgrade "localhost" to None, on the basis that in a cluster FS, the data clearly doesn't know where it is. > Review Spark's handling of filesystems returning "localhost" in > getFileBlockLocations > - > > Key: SPARK-20560 > URL: https://issues.apache.org/jira/browse/SPARK-20560 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Priority: Minor > > Some filesystems (S3a, Azure WASB) return "localhost" as the response to > {{FileSystem.getFileBlockLocations(path)}}. If this is then used as the > preferred host when scheduling work, there's a risk that work will be queued > on one host, rather than spread across the cluster. > HIVE-14060 and TEZ-3291 have both seen it in their schedulers. > I don't know if Spark does it, someone needs to look at the code, maybe write > some tests -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19582) DataFrameReader conceptually inadequate
[ https://issues.apache.org/jira/browse/SPARK-19582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992985#comment-15992985 ] Steve Loughran commented on SPARK-19582: All spark is doing is taking a URL To data, mapping that to an FS implementation classname and expecting that to implement the methods in `org.apache.hadoop.FileSystem` so as to provide FS-like behaviour. Giving minio is nominally an S3 clone, sounds like there's a problem here setting up the hadoop S3a client to bind to it. I'd isolate that to the Hadoop code before going near Spark, test on Hadoop 2.8 & file bugs against Hadoop and/or minio if there are problems. AFAIK, nobody has run the Hadoop S3A [tests|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md] against minio; doing that and documenting how to configure the client would be a welcome contribution. If minio is 100% S3 compatible (c3/v4 auth + multipart PUT; encryption optional), then the S3A client should work with it...it could work as another integration test for minio. > DataFrameReader conceptually inadequate > --- > > Key: SPARK-19582 > URL: https://issues.apache.org/jira/browse/SPARK-19582 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 2.1.0 >Reporter: James Q. Arnold > > DataFrameReader assumes it "understands" all data sources (local file system, > object stores, jdbc, ...). This seems limiting in the long term, imposing > both development costs to accept new sources and dependency issues for > existing sources (how to coordinate the XX jar for internal use vs. the XX > jar used by the application). Unless I have missed how this can be done > currently, an application with an unsupported data source cannot create the > required RDD for distribution. > I recommend at least providing a text API for supplying data. Let the > application provide data as a String (or char[] or ...)---not a path, but the > actual data. Alternatively, provide interfaces or abstract classes the > application could provide to let the application handle external data > sources, without forcing all that complication into the Spark implementation. > I don't have any code to submit, but JIRA seemed like to most appropriate > place to raise the issue. > Finally, if I have overlooked how this can be done with the current API, a > new example would be appreciated. > Additional detail... > We use the minio object store, which provides an API compatible with AWS-S3. > A few configuration/parameter values differ for minio, but one can use the > AWS library in the application to connect to the minio server. > When trying to use minio objects through spark, the s3://xxx paths are > intercepted by spark and handed to hadoop. So far, I have been unable to > find the right combination of configuration values and parameters to > "convince" hadoop to forward the right information to work with minio. If I > could read the minio object in the application, and then hand the object > contents directly to spark, I could bypass hadoop and solve the problem. > Unfortunately, the underlying spark design prevents that. So, I see two > problems. > - Spark seems to have taken on the responsibility of "knowing" the API > details of all data sources. This seems iffy in the long run (and is the > root of my current problem). In the long run, it seems unwise to assume that > spark should understand all possible path names, protocols, etc. Moreover, > passing S3 paths to hadoop seems a little odd (why not go directly to AWS, > for example). This particular confusion about S3 shows the difficulties that > are bound to occur. > - Second, spark appears not to have a way to bypass the path name > interpretation. At the least, spark could provide a text/blob interface, > letting the application supply the data object and avoid path interpretation > inside spark. Alternatively, spark could accept a reader/stream/... to build > the object, again letting the application provide the implementation of the > object input. > As I mentioned above, I might be missing something in the API that lets us > work around the problem. I'll keep looking, but the API as apparently > structured seems too limiting. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20560) Review Spark's handling of filesystems returning "localhost" in getFileBlockLocations
[ https://issues.apache.org/jira/browse/SPARK-20560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran resolved SPARK-20560. Resolution: Invalid "localhost" is filtered, been done in {{HadoopRDD.getPreferredLocations()}} since commit #06aac8a. > Review Spark's handling of filesystems returning "localhost" in > getFileBlockLocations > - > > Key: SPARK-20560 > URL: https://issues.apache.org/jira/browse/SPARK-20560 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Priority: Minor > > Some filesystems (S3a, Azure WASB) return "localhost" as the response to > {{FileSystem.getFileBlockLocations(path)}}. If this is then used as the > preferred host when scheduling work, there's a risk that work will be queued > on one host, rather than spread across the cluster. > HIVE-14060 and TEZ-3291 have both seen it in their schedulers. > I don't know if Spark does it, someone needs to look at the code, maybe write > some tests -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21137) Spark reads many small files slowly off local filesystem
[ https://issues.apache.org/jira/browse/SPARK-21137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-21137: --- Summary: Spark reads many small files slowly off local filesystem (was: Spark reads many small files slowly) > Spark reads many small files slowly off local filesystem > > > Key: SPARK-21137 > URL: https://issues.apache.org/jira/browse/SPARK-21137 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: sam >Priority: Minor > > A very common use case in big data is to read a large number of small files. > For example the Enron email dataset has 1,227,645 small files. > When one tries to read this data using Spark one will hit many issues. > Firstly, even if the data is small (each file only say 1K) any job can take a > very long time (I have a simple job that has been running for 3 hours and has > not yet got to the point of starting any tasks, I doubt if it will ever > finish). > It seems all the code in Spark that manages file listing is single threaded > and not well optimised. When I hand crank the code and don't use Spark, my > job runs much faster. > Is it possible that I'm missing some configuration option? It seems kinda > surprising to me that Spark cannot read Enron data given that it's such a > quintessential example. > So it takes 1 hour to output a line "1,227,645 input paths to process", it > then takes another hour to output the same line. Then it outputs a CSV of all > the input paths (so creates a text storm). > Now it's been stuck on the following: > {code} > 17/06/19 09:31:07 INFO LzoCodec: Successfully loaded & initialized native-lzo > library [hadoop-lzo rev 154f1ef53e2d6ed126b0957d7995e0a610947608] > {code} > for 2.5 hours. > So I've provided full reproduce steps here (including code and cluster setup) > https://github.com/samthebest/scenron, scroll down to "Bug In Spark". You can > easily just clone, and follow the README to reproduce exactly! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null
[ https://issues.apache.org/jira/browse/SPARK-20886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171386#comment-16171386 ] Steve Loughran commented on SPARK-20886: Not, but related. This is handling the situation where the committer is a classic FileOutputCommitter (or subclass, like the parquet one), but when you ask for a working dir it returns null. SPARK-21549 looks like there's hard-coded expectations of a dest dir being set via the (private) config option used by FileOutputCommitter, and NPEing if its not there > HadoopMapReduceCommitProtocol to fail with message if > FileOutputCommitter.getWorkPath==null > --- > > Key: SPARK-20886 > URL: https://issues.apache.org/jira/browse/SPARK-20886 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Trivial > Fix For: 2.3.0 > > > This is minor, and the root cause is my fault *elsewhere*, but its the patch > I used to track down the problem. > If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for > committing things, and *somehow* that's been configured with a > {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs. > A {{require()}} statement can validate the working path and so point the > blame at whoever's code is confused. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171393#comment-16171393 ] Steve Loughran commented on SPARK-21549: Linking to SPARK-20045, which highlights the commit logic, especially the abort code, needs to be resilient to failures, including that of the invoked {{OutputCommitter.abort()}} calls from raising exceptions. While people implementing committers should be expected to write resilient abort routines, you can't rely on it. Same for calling fs.delete()...it could also fall, so wrapping everything in an exception handler would at least make abort resilient. > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171420#comment-16171420 ] Steve Loughran commented on SPARK-21549: # you can't rely on the committers having output and temp dirs. Subclasses of {{FileOutputCommitter}} *must*, though there's no official mechanism for querying that because {{getOutputPath()}} is private. # Hadoop 3.0 has added (MAPREDUCE-6956) a new superclass of {{FileOutputCommitter}}, [PathOutputCommitter|[https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java] which pulls up getWorkingDir method to be more general (so that you can have output committers which tell spark and hive where their intermediate data should go, without them being subclasses of FileOutputCommitter. # I'm happy to pull up {{getOutputPath}} to that class, and if people can give me a patch for it *this week* I'll add it for 3.0 beta 1. Regarding the committer here, you might want to think of moving off/subclassing {{HadoopMapReduceCommitProtocol}}. This is what I've done in [PathOutputCommitProtoco|https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/com/hortonworks/spark/cloud/PathOutputCommitProtocol.scala], though I can see it's still relying on the superclass to get that properly. Again, if we can patch the new PathOutputCommitter class for a getOutputPath I use that here. And yes, while that new mapreduce will take a long time to surface in spark core, you can use it independently, from later this year.. If you are playing with different committers out of spark's own codebase, pick up the the ORC hive tests from [https://github.com/hortonworks-spark/cloud-integration/tree/master/cloud-examples/src/test/scala/org/apache/spark/sql/sources]. These are just some of the spark sql tests reworked slightly so that they'll work with any FileSystem impl. rather than just local file:// paths Ping me direct if you are playing with new committers, & look at MAPREDUCE-6823 to see if that'd be useful to you (& how it could be improved, given its still not in the codebase) > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at >
[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173449#comment-16173449 ] Steve Loughran commented on SPARK-21549: The {{newTaskTempFileAbsPath()}} method is an interesting spot of code...I'm still trying to work out when it is actually used. Some committers like {{ManifestFileCommitProtocol}} don't support it all. However, if it is used, then your patch is going to cause problems if the dest FS != the default FS, because then the bit of the protocol which takes that list of temp files and renames() them into their destination is going to fail. I think you'd be better off having the committer fail fast when an absolute path is asked for > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-17159) Improve FileInputDStream.findNewFiles list performance
[ https://issues.apache.org/jira/browse/SPARK-17159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran resolved SPARK-17159. Resolution: Won't Fix Based on the feedback of https://github.com/apache/spark/pull/14731 ; I'm doing all my spark+cloud support elsewhere. closing as a WONTFIX; > Improve FileInputDStream.findNewFiles list performance > -- > > Key: SPARK-17159 > URL: https://issues.apache.org/jira/browse/SPARK-17159 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.0 > Environment: spark against object stores >Reporter: Steve Loughran >Priority: Minor > > {{FileInputDStream.findNewFiles()}} is doing a globStatus with a fitler that > calls getFileStatus() on every file, takes the output and does listStatus() > on the output. > This going to suffer on object stores, as dir listing and getFileStatus calls > are so expensive. It's clear this is a problem, as the method has code to > detect timeouts in the window and warn of problems. > It should be possible to make this faster -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22163) Design Issue of Spark Streaming that Causes Random Run-time Exception
[ https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-22163: --- Priority: Major (was: Critical) > Design Issue of Spark Streaming that Causes Random Run-time Exception > - > > Key: SPARK-22163 > URL: https://issues.apache.org/jira/browse/SPARK-22163 > Project: Spark > Issue Type: Bug > Components: DStreams, Structured Streaming >Affects Versions: 2.2.0 > Environment: Spark Streaming > Kafka > Linux >Reporter: Michael N > > The application objects can contain List and can be modified dynamically as > well. However, Spark Streaming framework asynchronously serializes the > application's objects as the application runs. Therefore, it causes random > run-time exception on the List when Spark Streaming framework happens to > serializes the application's objects while the application modifies a List in > its own object. > In fact, there are multiple bugs reported about > Caused by: java.util.ConcurrentModificationException > at java.util.ArrayList.writeObject > that are permutation of the same root cause. So the design issue of Spark > streaming framework is that it should do this serialization asynchronously. > Instead, it should either > 1. do this serialization synchronously. This is preferred to eliminate the > issue completely. Or > 2. Allow it to be configured per application whether to do this serialization > synchronously or asynchronously, depending on the nature of each application. > Also, Spark documentation should describe the conditions that trigger Spark > to do this type of serialization asynchronously, so the applications can work > around them until the fix is provided. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22217) ParquetFileFormat to support arbitrary OutputCommitters
[ https://issues.apache.org/jira/browse/SPARK-22217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-22217: --- Priority: Minor (was: Major) > ParquetFileFormat to support arbitrary OutputCommitters > --- > > Key: SPARK-22217 > URL: https://issues.apache.org/jira/browse/SPARK-22217 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Steve Loughran >Priority: Minor > > Although you can choose which committer to write dataframes as parquet data > via {{spark.sql.parquet.output.committer.class}}, you get a class cast > exception if this is not a > {{org.apache.parquet.hadoop.ParquetOutputCommitter}} or subclass. > This is not consistent with the docs in SQLConf, which says > bq. The specified class needs to be a subclass of > org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass > of org.apache.parquet.hadoop.ParquetOutputCommitter. > It is simple to relax {{ParquetFileFormat}}'s requirements, though if the > user has set > {{parquet.enable.summary-metadata=true}}, and set a committer which is not a > ParquetOutputCommitter, then they won't see the data. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22217) ParquetFileFormat to support arbitrary OutputCommitters if parquet.enable.summary-metadata is false
Steve Loughran created SPARK-22217: -- Summary: ParquetFileFormat to support arbitrary OutputCommitters if parquet.enable.summary-metadata is false Key: SPARK-22217 URL: https://issues.apache.org/jira/browse/SPARK-22217 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Steve Loughran Although you can choose which committer to write dataframes as parquet data via {{spark.sql.parquet.output.committer.class}}, you get a class cast exception if this is not a {{org.apache.parquet.hadoop.ParquetOutputCommitter}} or subclass. This is not consistent with the docs in SQLConf, which says bq. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter. It is simple to relax {{ParquetFileFormat}}'s requirements, though if the user has set {{parquet.enable.summary-metadata=true}}, and set a committer which is not a ParquetOutputCommitter, then they won't see the data. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22217) ParquetFileFormat to support arbitrary OutputCommitters
[ https://issues.apache.org/jira/browse/SPARK-22217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-22217: --- Summary: ParquetFileFormat to support arbitrary OutputCommitters (was: ParquetFileFormat to support arbitrary OutputCommitters if parquet.enable.summary-metadata is false) > ParquetFileFormat to support arbitrary OutputCommitters > --- > > Key: SPARK-22217 > URL: https://issues.apache.org/jira/browse/SPARK-22217 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Steve Loughran > > Although you can choose which committer to write dataframes as parquet data > via {{spark.sql.parquet.output.committer.class}}, you get a class cast > exception if this is not a > {{org.apache.parquet.hadoop.ParquetOutputCommitter}} or subclass. > This is not consistent with the docs in SQLConf, which says > bq. The specified class needs to be a subclass of > org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass > of org.apache.parquet.hadoop.ParquetOutputCommitter. > It is simple to relax {{ParquetFileFormat}}'s requirements, though if the > user has set > {{parquet.enable.summary-metadata=true}}, and set a committer which is not a > ParquetOutputCommitter, then they won't see the data. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed
[ https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203495#comment-16203495 ] Steve Loughran commented on SPARK-22240: We've got a test in HADOOP-14943 which looks @ part sizing, confirms you only get one part back from s3a right now. > S3 CSV number of partitions incorrectly computed > > > Key: SPARK-22240 > URL: https://issues.apache.org/jira/browse/SPARK-22240 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0 >Reporter: Arthur Baudry > > Reading CSV out of S3 using S3A protocol does not compute the number of > partitions correctly in Spark 2.2.0. > With Spark 2.2.0 I get only partition when loading a 14GB file > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 1 > {code} > While in Spark 2.0.2 I had: > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 115 > {code} > This introduces obvious performance issues in Spark 2.2.0. Maybe there is a > property that should be set to have the number of partitions computed > correctly. > I'm aware that the .option("multiline","true") is not supported in Spark > 2.0.2, it's not relevant here. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21999) ConcurrentModificationException - Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192732#comment-16192732 ] Steve Loughran edited comment on SPARK-21999 at 10/5/17 1:39 PM: - Apache projects are all open source, with an open community. The ASF welcomes contributions, and does request that people follow an etiquette of constructive discourse: https://community.apache.org/contributors/etiquette Personally, I've always found Sean to be helpful and polite, even when I've turned out to be utterly wrong. I'd advise listening to his feedback, and not view it as a personal criticism. Whatever problem you have, faulting him isn't going to fix it. Like I said, projects welcomes contributors and their contributions. # One key area where people can get their hands dirty in a project is documentation. Do you think some content in the spark streaming guide could be improved? Why not add that section & submit a patch for review? # Because you'll need the latest source tree to hand to do it, before you even worry about documenting a problem, why not see if it "has gone away", or at least "moved somewhere else"? Working with the latest versions of the code may seem leading edge, but its the best way to get your problems fixed, and the only way to pick up an immediate patch. # For any issue in a project, it doesn't get fixed ASAP, at least not in a form you can pick up and use unless you have that build environment yourself. If you are using a software product built on spark, well, you'll need to wait for that supplier to update their release, which happens on their release schedule. For the ASF releases, there's a release process and timetable. # Which means: for now, you have to come up with a solution to your problem. Given it's related to structure serialization, well, you can implement your own {{readObject}} and {{writeObject}} methods, and so perhaps wrap the structures being serialized with something which implements a broader locking structure across your data. It's what I'd try first, as it would be an interesting little problem: A datastructure which can have mutating operations invoked while serialization is in progress. Testing this would be fun...you'd need some kind of mock OutputStream which could block on a write() so you could guarantee the writer thread was blocking at the correct place for the mutation call to trigger the failure. As to whether it's architectural? That's something which could be debated. Checkpointing the state of streaming applications is one of those challenging problems, especially if the notion of "where you are" across multiple input streams is hard, and, if the cost of preserving state is high, any attempt to block for the operation will hurt throughput. And changing checkpoint architectures is so fundamental that its not some one-line patch. If you can fix your structures up to be robustly serializable during asynchronous writes, you'll get performance as well as robustness. was (Author: ste...@apache.org): Apache projects are all open source, with an open community. The ASF welcomes contributions, and does request that people follow an etiquette of constructive discourse: https://community.apache.org/contributors/etiquette Personally, I've always found Sean to be helpful and polite, even when I've turned out to be utterly wrong. I'd advise listening to his feedback, and not view it as a personal criticism. Whatever problem you have, faulting him isn't going to fix it. Like I said, projects welcomes contributors and their contributions. # One key area where people can get their hands dirty in a project is documentation. Do you think some content in the spark streaming guide could be improved? Why not add that section & submit a patch for review? # Because you'll need the latest source tree to hand to do it, before you even worry about documenting a problem, why not see if it "has gone away", or at least "moved somewhere else"? Working with the latest versions of the code may seem leading edge, but its the best way to get your problems fixed, and the only way to pick up an immediate patch. # For any issue in a project, it doesn't get fixed ASAP, at least not in a form you can pick up and use unless you have that build environment yourself. If you are using a software product built on spark, well, you'll need to wait for that supplier to update their release, which happens on their release schedule. For the ASF releases, there's a release process and timetable. # Which means: for now, you have to come up with a solution to your problem. Given it's related to structure serialization, well, you can implement your own {{readObject}} and {{writeObject}} methods, and so perhaps wrap the structures being serialized with something which implements a broader locking structure across your data. It's what I'd try
[jira] [Commented] (SPARK-21999) ConcurrentModificationException - Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192732#comment-16192732 ] Steve Loughran commented on SPARK-21999: Apache projects are all open source, with an open community. The ASF welcomes contributions, and does request that people follow an etiquette of constructive discourse: https://community.apache.org/contributors/etiquette Personally, I've always found Sean to be helpful and polite, even when I've turned out to be utterly wrong. I'd advise listening to his feedback, and not view it as a personal criticism. Whatever problem you have, faulting him isn't going to fix it. Like I said, projects welcomes contributors and their contributions. # One key area where people can get their hands dirty in a project is documentation. Do you think some content in the spark streaming guide could be improved? Why not add that section & submit a patch for review? # Because you'll need the latest source tree to hand to do it, before you even worry about documenting a problem, why not see if it "has gone away", or at least "moved somewhere else"? Working with the latest versions of the code may seem leading edge, but its the best way to get your problems fixed, and the only way to pick up an immediate patch. # For any issue in a project, it doesn't get fixed ASAP, at least not in a form you can pick up and use unless you have that build environment yourself. If you are using a software product built on spark, well, you'll need to wait for that supplier to update their release, which happens on their release schedule. For the ASF releases, there's a release process and timetable. # Which means: for now, you have to come up with a solution to your problem. Given it's related to structure serialization, well, you can implement your own {{readObject}} and {{writeObject}} methods, and so perhaps wrap the structures being serialized with something which implements a broader locking structure across your data. It's what I'd try first, as it would be an interesting little problem: A datastructure which can have mutating operations invoked while serialization is in progress. Testing this would be fun...you'd need some kind of mock OutputStream which could block on a write() so you could guarantee the writer thread was blocking at the correct place for the mutation call to trigger the failure. As to whether it's architectural, well, that's something which could be debated. Checkpointing the state of streaming applications is one of those challenging problems, especially if the notion of "where you are" across multiple input streams is hard, and, if the cost of preserving state is high, any attempt to block for the operation will hurt throughput. And changing checkpoint architectures is so fundamental that its not some one-line patch. If you can fix your structures up to be robustly serializable during asynchronous writes, you'll get performance as well as robustness. > ConcurrentModificationException - Spark Streaming > - > > Key: SPARK-21999 > URL: https://issues.apache.org/jira/browse/SPARK-21999 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Michael N >Priority: Critical > > Hi, > I am using Spark Streaming v2.1.0 with Kafka 0.8. I am getting > ConcurrentModificationException intermittently. When it occurs, Spark does > not honor the specified value of spark.task.maxFailures. So Spark aborts the > current batch and fetch the next batch, so it results in lost data. Its > exception stack is listed below. > This instance of ConcurrentModificationException is similar to the issue at > https://issues.apache.org/jira/browse/SPARK-17463, which was about > Serialization of accumulators in heartbeats. However, my Spark stream app > does not use accumulators. > The stack trace listed below occurred on the Spark master in Spark streaming > driver at the time of data loss. > From the line of code in the first stack trace, can you tell which object > Spark was trying to serialize ? What is the root cause for this issue ? > Because this issue results in lost data as described above, could you have > this issue fixed ASAP ? > Thanks. > Michael N., > > Stack trace of Spark Streaming driver > ERROR JobScheduler:91: Error generating jobs for time 150522493 ms > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at
[jira] [Commented] (SPARK-21999) ConcurrentModificationException - Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16194332#comment-16194332 ] Steve Loughran commented on SPARK-21999: Telling a project "their design is wrong" and expecting a co-operative response isn't going to work, and if something is a core part of the architecture, it's not going to change. That's why he's closing it. It'd be like going to linux-kernel dev and demanding that they switched to a microkernel architecture, or emacs-dev and pointing out that their key encodings make no sense on modern keyboards. All WONTFIX complaints where you aren't going get any satisfaction in raising -so why bother. bq. About your other points, I already modified my code to get around this issue. good to hear. In my many years as a software developer, I've come to realise that software development is about working round the implementation and architectural decisions which my predecessors made, and which, in modern times, don't seem relevant. All while trying not to do the same things yourself. It's a losing battle, but being able to work around problems is the fundamental skill that seems to hold bq. 1. In the first place, why does Spark serialize the application objects asynchronously while the streaming application is running continuously from batch to batch ? Don't know. to find out, I'd find the lines where it takes place, select it in my IDE, hit "show history for selection" & work back from the pull requests bq. 2. If Spark needs to do this type of serialization at all, why does it not do at the end of the batch synchronously ? It's how it checkpoints the state of a streaming application. That's a fundamental need. bq. But Sean did not provide the answers and instead just kept closing that ticket. If he does not know the answers or information for tickets, he should let someone else who has such information answers them. The source is there. And along with the source comes the SCM history, which provides the rationale for most decisions. Now, to close this, and to stop Sean taking all the blame, I'll be closing this as a WONTFIX. Please, only re-open if you have something to contribute, in particular, as I mentioned, documentation. Grab the latest source, improve the streaming docs, follow the spark contribution process & submit a github pull request, see how it goes. > ConcurrentModificationException - Spark Streaming > - > > Key: SPARK-21999 > URL: https://issues.apache.org/jira/browse/SPARK-21999 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Michael N >Priority: Critical > > Hi, > I am using Spark Streaming v2.1.0 with Kafka 0.8. I am getting > ConcurrentModificationException intermittently. When it occurs, Spark does > not honor the specified value of spark.task.maxFailures. So Spark aborts the > current batch and fetch the next batch, so it results in lost data. Its > exception stack is listed below. > This instance of ConcurrentModificationException is similar to the issue at > https://issues.apache.org/jira/browse/SPARK-17463, which was about > Serialization of accumulators in heartbeats. However, my Spark stream app > does not use accumulators. > The stack trace listed below occurred on the Spark master in Spark streaming > driver at the time of data loss. > From the line of code in the first stack trace, can you tell which object > Spark was trying to serialize ? What is the root cause for this issue ? > Because this issue results in lost data as described above, could you have > this issue fixed ASAP ? > Thanks. > Michael N., > > Stack trace of Spark Streaming driver > ERROR JobScheduler:91: Error generating jobs for time 150522493 ms > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792) > at >
[jira] [Resolved] (SPARK-21999) ConcurrentModificationException - Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran resolved SPARK-21999. Resolution: Won't Fix > ConcurrentModificationException - Spark Streaming > - > > Key: SPARK-21999 > URL: https://issues.apache.org/jira/browse/SPARK-21999 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Michael N >Priority: Critical > > Hi, > I am using Spark Streaming v2.1.0 with Kafka 0.8. I am getting > ConcurrentModificationException intermittently. When it occurs, Spark does > not honor the specified value of spark.task.maxFailures. So Spark aborts the > current batch and fetch the next batch, so it results in lost data. Its > exception stack is listed below. > This instance of ConcurrentModificationException is similar to the issue at > https://issues.apache.org/jira/browse/SPARK-17463, which was about > Serialization of accumulators in heartbeats. However, my Spark stream app > does not use accumulators. > The stack trace listed below occurred on the Spark master in Spark streaming > driver at the time of data loss. > From the line of code in the first stack trace, can you tell which object > Spark was trying to serialize ? What is the root cause for this issue ? > Because this issue results in lost data as described above, could you have > this issue fixed ASAP ? > Thanks. > Michael N., > > Stack trace of Spark Streaming driver > ERROR JobScheduler:91: Error generating jobs for time 150522493 ms > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792) > at > org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37) > at > org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > at > org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) > at > org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at
[jira] [Commented] (SPARK-2984) FileNotFoundException on _temporary directory
[ https://issues.apache.org/jira/browse/SPARK-2984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207573#comment-16207573 ] Steve Loughran commented on SPARK-2984: --- bq. multiple batches writing to same location simultaneously Hadoop {{FileOutputCommitter}} cleans up $dest/_temporary while committing or aborting a job. if you are writing >1 job to the same directory tree simultaneously, expect the job cleanup in one task to break the others. You could try overloading ParquetOutputCommitter.cleanupJob() to stop this, but it's probably safer to work out why output to the same path is happening in parallel and stop it. > FileNotFoundException on _temporary directory > - > > Key: SPARK-2984 > URL: https://issues.apache.org/jira/browse/SPARK-2984 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Andrew Ash >Assignee: Josh Rosen >Priority: Critical > Fix For: 1.3.0 > > > We've seen several stacktraces and threads on the user mailing list where > people are having issues with a {{FileNotFoundException}} stemming from an > HDFS path containing {{_temporary}}. > I ([~aash]) think this may be related to {{spark.speculation}}. I think the > error condition might manifest in this circumstance: > 1) task T starts on a executor E1 > 2) it takes a long time, so task T' is started on another executor E2 > 3) T finishes in E1 so moves its data from {{_temporary}} to the final > destination and deletes the {{_temporary}} directory during cleanup > 4) T' finishes in E2 and attempts to move its data from {{_temporary}}, but > those files no longer exist! exception > Some samples: > {noformat} > 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job > 140774430 ms.0 > java.io.FileNotFoundException: File > hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07 > does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) > at > org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) > at > org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643) > at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > -- Chen Song at > http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFiles-file-not-found-exception-td10686.html > {noformat} > I am running a Spark Streaming job that uses saveAsTextFiles to save results > into hdfs files. However, it has an exception after 20 batches > result-140631234/_temporary/0/task_201407251119__m_03 does not > exist. > {noformat} > and > {noformat} >
[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed
[ https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203709#comment-16203709 ] Steve Loughran commented on SPARK-22240: [~hyukjin.kwon]: we now see that on s3a, you only ever get a partition size from the FS of 1, which means when spark asks the FS "how many blocks is this file in", the answer is 1. That's info which can be used for determining what gives best IO performance for multiple executors. In HDFS, for a file in N blocks & R replicas, you get N * R actual blocks you can read in parallel. For S3 it may look like there is just 1 block, but if you upload in multiple parts (as happens when the write gets above some threshold like 64M, then you do get parallelism on the read. There's no way to determine what the block size was on the upload (maybe we could save it as a header in future), but we can get the FS to tell the query engine what the current blocksize is & so be consistent across writes and reads. Spark does its own partitioning too, with the default min size & other things controlling the decision, e.g. what the user asks for, #of executors, whether the format says it can be split (anything .gz == false, etc). So it may just be the multiline case. the local filesystem only returns one element for getFileBlockLocations(path, ...), so you can just check locally if the problem exists when multiline = true vs false. partitions > S3 CSV number of partitions incorrectly computed > > > Key: SPARK-22240 > URL: https://issues.apache.org/jira/browse/SPARK-22240 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0 >Reporter: Arthur Baudry > > Reading CSV out of S3 using S3A protocol does not compute the number of > partitions correctly in Spark 2.2.0. > With Spark 2.2.0 I get only partition when loading a 14GB file > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 1 > {code} > While in Spark 2.0.2 I had: > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 115 > {code} > This introduces obvious performance issues in Spark 2.2.0. Maybe there is a > property that should be set to have the number of partitions computed > correctly. > I'm aware that the .option("multiline","true") is not supported in Spark > 2.0.2, it's not relevant here. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed
[ https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201863#comment-16201863 ] Steve Loughran commented on SPARK-22240: thanks. Now for a question which is probably obvious to others: Is there a straightforward way to determine the number of partitions a file is being broken into other than doing a job & counting the # of part- files? I guess even if there is, counting the files is straightforward. If it's multiline related, do you think this would apply to all filesystems, or is it possible that s3a is making things worse? Anyway, whatever happens here, we'll get a fix for s3a's getBlockLocations in shortly > S3 CSV number of partitions incorrectly computed > > > Key: SPARK-22240 > URL: https://issues.apache.org/jira/browse/SPARK-22240 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0 >Reporter: Arthur Baudry > > Reading CSV out of S3 using S3A protocol does not compute the number of > partitions correctly in Spark 2.2.0. > With Spark 2.2.0 I get only partition when loading a 14GB file > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 1 > {code} > While in Spark 2.0.2 I had: > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 115 > {code} > This introduces obvious performance issues in Spark 2.2.0. Maybe there is a > property that should be set to have the number of partitions computed > correctly. > I'm aware that the .option("multiline","true") is not supported in Spark > 2.0.2, it's not relevant here. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201917#comment-16201917 ] Steve Loughran commented on SPARK-21797: Update, in HADOOP-14874 I've noted we could use the existing {{FileSystem.getContentSummary(Path)}} API to return {{StorageType.ARCHIVE}} for glaciated data. You'd need a way of filtering the listing of source files to strip out everything of archive type, but then yes, you could skip data in glacier > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Amazon EMR >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed
[ https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203966#comment-16203966 ] Steve Loughran commented on SPARK-22240: Point me at a simple test suite for the multiline & I'll see what I can do on s3a to explore its behaviour > S3 CSV number of partitions incorrectly computed > > > Key: SPARK-22240 > URL: https://issues.apache.org/jira/browse/SPARK-22240 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0 >Reporter: Arthur Baudry > > Reading CSV out of S3 using S3A protocol does not compute the number of > partitions correctly in Spark 2.2.0. > With Spark 2.2.0 I get only partition when loading a 14GB file > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 1 > {code} > While in Spark 2.0.2 I had: > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 115 > {code} > This introduces obvious performance issues in Spark 2.2.0. Maybe there is a > property that should be set to have the number of partitions computed > correctly. > I'm aware that the .option("multiline","true") is not supported in Spark > 2.0.2, it's not relevant here. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146193#comment-16146193 ] Steve Loughran commented on SPARK-21797: No> That's a shame. I only came across the option when I pasted the stack trace in the IDE, and it said "enable this option". sorry, I'm not sure about what other strategies there are. Sean? Any idea? > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Amazon EMR >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20448) Document how FileInputDStream works with object storage
[ https://issues.apache.org/jira/browse/SPARK-20448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178236#comment-16178236 ] Steve Loughran commented on SPARK-20448: thanks! > Document how FileInputDStream works with object storage > --- > > Key: SPARK-20448 > URL: https://issues.apache.org/jira/browse/SPARK-20448 > Project: Spark > Issue Type: Documentation > Components: Documentation >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Minor > Fix For: 2.3.0 > > > Object stores work differently from filesystems: intermediate writes not > visible, renames are really O(data) copies, not O(1) transactions. > This makes working with them as DStreams fundamentally different: you can > write straight into the destination. > 1. Document how FileinputDStreams scan directories for changes > 2. Document how object stores behave differently, and the implications > for users. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-2356) Exception: Could not locate executable null\bin\winutils.exe in the Hadoop
[ https://issues.apache.org/jira/browse/SPARK-2356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran resolved SPARK-2356. --- Resolution: Duplicate > Exception: Could not locate executable null\bin\winutils.exe in the Hadoop > --- > > Key: SPARK-2356 > URL: https://issues.apache.org/jira/browse/SPARK-2356 > Project: Spark > Issue Type: Bug > Components: Windows >Affects Versions: 1.0.0, 1.1.1, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 1.4.1, 1.5.0, > 1.5.1, 1.5.2 >Reporter: Kostiantyn Kudriavtsev >Priority: Critical > > I'm trying to run some transformation on Spark, it works fine on cluster > (YARN, linux machines). However, when I'm trying to run it on local machine > (Windows 7) under unit test, I got errors (I don't use Hadoop, I'm read file > from local filesystem): > {code} > 14/07/02 19:59:31 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 14/07/02 19:59:31 ERROR Shell: Failed to locate the winutils binary in the > hadoop binary path > java.io.IOException: Could not locate executable null\bin\winutils.exe in the > Hadoop binaries. > at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) > at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) > at org.apache.hadoop.util.Shell.(Shell.java:326) > at org.apache.hadoop.util.StringUtils.(StringUtils.java:76) > at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) > at org.apache.hadoop.security.Groups.(Groups.java:77) > at > org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) > at > org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) > at > org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) > at > org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36) > at > org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109) > at > org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) > at org.apache.spark.SparkContext.(SparkContext.scala:228) > at org.apache.spark.SparkContext.(SparkContext.scala:97) > {code} > It's happened because Hadoop config is initialized each time when spark > context is created regardless is hadoop required or not. > I propose to add some special flag to indicate if hadoop config is required > (or start this configuration manually) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2356) Exception: Could not locate executable null\bin\winutils.exe in the Hadoop
[ https://issues.apache.org/jira/browse/SPARK-2356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185744#comment-16185744 ] Steve Loughran commented on SPARK-2356: --- [~Vasilina], that probably means you're running with Hadoop <=2.7; the more helpful message only went in with HADOOP-10775. Sorry. I'm about to close this as a duplicate of HADOOP-10775, as really it is a config problem (plus the need for the hadoop libs to have a copy of winutils.exe around for file operations)...all that can be done short of removing that dependency is fixing the error message, which we've done our best at. > Exception: Could not locate executable null\bin\winutils.exe in the Hadoop > --- > > Key: SPARK-2356 > URL: https://issues.apache.org/jira/browse/SPARK-2356 > Project: Spark > Issue Type: Bug > Components: Windows >Affects Versions: 1.0.0, 1.1.1, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 1.4.1, 1.5.0, > 1.5.1, 1.5.2 >Reporter: Kostiantyn Kudriavtsev >Priority: Critical > > I'm trying to run some transformation on Spark, it works fine on cluster > (YARN, linux machines). However, when I'm trying to run it on local machine > (Windows 7) under unit test, I got errors (I don't use Hadoop, I'm read file > from local filesystem): > {code} > 14/07/02 19:59:31 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 14/07/02 19:59:31 ERROR Shell: Failed to locate the winutils binary in the > hadoop binary path > java.io.IOException: Could not locate executable null\bin\winutils.exe in the > Hadoop binaries. > at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) > at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) > at org.apache.hadoop.util.Shell.(Shell.java:326) > at org.apache.hadoop.util.StringUtils.(StringUtils.java:76) > at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) > at org.apache.hadoop.security.Groups.(Groups.java:77) > at > org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) > at > org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) > at > org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) > at > org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36) > at > org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109) > at > org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) > at org.apache.spark.SparkContext.(SparkContext.scala:228) > at org.apache.spark.SparkContext.(SparkContext.scala:97) > {code} > It's happened because Hadoop config is initialized each time when spark > context is created regardless is hadoop required or not. > I propose to add some special flag to indicate if hadoop config is required > (or start this configuration manually) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21817) Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138683#comment-16138683 ] Steve Loughran commented on SPARK-21817: I think it's a regression in HDFS-6984; the superclass handles permissions being null, but the modified LocatedFileStatus ctor doesn't. Ewan: do a patch there with a new test method (where?) & I'll review it. > Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex > -- > > Key: SPARK-21817 > URL: https://issues.apache.org/jira/browse/SPARK-21817 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ewan Higgs >Priority: Minor > Attachments: SPARK-21817.001.patch > > > The implementation of HDFS-6984 now uses the passed in {{FSPermission}} to > pull out the ACL and other information. Therefore passing in a {{null}} is no > longer adequate and hence causes a NPE when listing files. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21817) Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138664#comment-16138664 ] Steve Loughran commented on SPARK-21817: This a regression in HDFS? > Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex > -- > > Key: SPARK-21817 > URL: https://issues.apache.org/jira/browse/SPARK-21817 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ewan Higgs >Priority: Minor > Attachments: SPARK-21817.001.patch > > > The implementation of HDFS-6984 now uses the passed in {{FSPermission}} to > pull out the ACL and other information. Therefore passing in a {{null}} is no > longer adequate and hence causes a NPE when listing files. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21817) Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138685#comment-16138685 ] Steve Loughran commented on SPARK-21817: API is tagged as stable/evolving; it's clearly in use downstream, so strictly, yep, broken, and easily fixed. Just not a codepath tested in hdfs > Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex > -- > > Key: SPARK-21817 > URL: https://issues.apache.org/jira/browse/SPARK-21817 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ewan Higgs >Priority: Minor > Attachments: SPARK-21817.001.patch > > > The implementation of HDFS-6984 now uses the passed in {{FSPermission}} to > pull out the ACL and other information. Therefore passing in a {{null}} is no > longer adequate and hence causes a NPE when listing files. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21702) Structured Streaming S3A SSE Encryption Not Visible through AWS S3 GUI when PartitionBy Used
[ https://issues.apache.org/jira/browse/SPARK-21702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139898#comment-16139898 ] Steve Loughran commented on SPARK-21702: IF this is just "directories", then there are no directories in s3. We create some mock ones for empty dirs (i.e after a mkdirs() call), through 0-byte objects. We then delete all such 0-byte objects when you write data underneath {{see S3AFilesystem.deleteUnnecessaryFakeDirectories(Path)}}. I think that's what's been causing the confusion. I'm going to close this one as invalid. sorry. FWIW, if you do want to guarantee data in a bucket is encrypted, set the bucket policy to mandate this. It's the best way to be confident that all your data is locked down: [[https://hortonworks.github.io/hdp-aws/s3-encryption/index.html]] > Structured Streaming S3A SSE Encryption Not Visible through AWS S3 GUI when > PartitionBy Used > > > Key: SPARK-21702 > URL: https://issues.apache.org/jira/browse/SPARK-21702 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 > Environment: Hadoop 2.7.3: AWS SDK 1.7.4 > Hadoop 2.8.1: AWS SDK 1.10.6 >Reporter: George Pongracz >Priority: Minor > Labels: security > > Settings: > .config("spark.hadoop.fs.s3a.impl", > "org.apache.hadoop.fs.s3a.S3AFileSystem") > .config("spark.hadoop.fs.s3a.server-side-encryption-algorithm", > "AES256") > When writing to an S3 sink from structured streaming the files are being > encrypted using AES-256 > When introducing a "PartitionBy" the output data files are unencrypted. > All other supporting files, metadata are encrypted > Suspect write to temp is encrypted and move/rename is not applying the SSE. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21702) Structured Streaming S3A SSE Encryption Not Visible through AWS S3 GUI when PartitionBy Used
[ https://issues.apache.org/jira/browse/SPARK-21702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran resolved SPARK-21702. Resolution: Invalid > Structured Streaming S3A SSE Encryption Not Visible through AWS S3 GUI when > PartitionBy Used > > > Key: SPARK-21702 > URL: https://issues.apache.org/jira/browse/SPARK-21702 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 > Environment: Hadoop 2.7.3: AWS SDK 1.7.4 > Hadoop 2.8.1: AWS SDK 1.10.6 >Reporter: George Pongracz >Priority: Minor > Labels: security > > Settings: > .config("spark.hadoop.fs.s3a.impl", > "org.apache.hadoop.fs.s3a.S3AFileSystem") > .config("spark.hadoop.fs.s3a.server-side-encryption-algorithm", > "AES256") > When writing to an S3 sink from structured streaming the files are being > encrypted using AES-256 > When introducing a "PartitionBy" the output data files are unencrypted. > All other supporting files, metadata are encrypted > Suspect write to temp is encrypted and move/rename is not applying the SSE. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140138#comment-16140138 ] Steve Loughran commented on SPARK-21797: I was talking about the cost and time of getting data from Glacier. If that's the only place where data lives, then its slow and expensive. And that's the bit I'm describing as niche. Given I've been working full time on S3A, I'm reasonably confident it gets used a lot. If you talk to data in S3 that has been backed up to glacier, you *wlll get a 403*: According to Jeff Barr himself: https://aws.amazon.com/blogs/aws/archive-s3-to-glacier/ bq. If you archive objects using the Glacier storage option, you must inspect the storage class of an object before you attempt to retrieve it. The customary GET request will work as expected if the object is stored in S3 Standard or Reduced Redundancy (RRS) storage. It will fail (with a 403 error) if the object is archived in Glacier. In this case, you must use the RESTORE operation (described below) to make your data available in S3. bq. You use S3’s new RESTORE operation to access an object archived in Glacier. As part of the request, you need to specify a retention period in days. Restoring an object will generally take 3 to 5 hours. Your restored object will remain in both Glacier and S3’s Reduced Redundancy Storage (RRS) for the duration of the retention period. At the end of the retention period the object’s data will be removed from S3; the object will remain in Glacier. Like I said, I'd be interested in getting the full stack trace if you try to read this with an S3A client. Not for fixing, but for better reporting. Probably point them at Jeff's blog entry. Or this JIRA :) > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140147#comment-16140147 ] Steve Loughran commented on SPARK-21797: Note that if it is just during spark partition calculation, it's probable that it is going down the directory tree and inspecting the files through HEAD requests, maybe looking at metadata entries too. So do attach the s3a & spark trace so we can see what's going on, as something may be over enthusastic about looking at files, or we could have something recognise the problem and recover from it. > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21817) Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140161#comment-16140161 ] Steve Loughran commented on SPARK-21817: FYI, this is now fixed in hadoop trunk/3.0-beta-1 > Pass FSPermissions to LocatedFileStatus from InMemoryFileIndex > -- > > Key: SPARK-21817 > URL: https://issues.apache.org/jira/browse/SPARK-21817 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ewan Higgs >Priority: Minor > Attachments: SPARK-21817.001.patch > > > The implementation of HDFS-6984 now uses the passed in {{FSPermission}} to > pull out the ACL and other information. Therefore passing in a {{null}} is no > longer adequate and hence causes a NPE when listing files. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139919#comment-16139919 ] Steve Loughran commented on SPARK-21797: If you are using S3// URLs then its the AWS team's problem. If you were using s3a://, then it'd be something you ask the hadoop team to look at, but we'd say no as * it's a niche use case * It's really slow, as in "read() takes so long other bits of the system will start to think your worker is hanging". Which means if you have speculative execution turned on, they kick off other workers to read the data. * t's a very, very expensive way to work with data; $0.03/GB, which ramps up fast once multiple spark workers start reading the same datasets in parallel. * Finally, it's been rejected on the server with a 403 response. That's Amazon S3 saying "no", not any of the clients. You shouldn't be trying to process data direct from S3. Copy to S3 or a transient HDFS cluster, maybe as part of an oozie or airflow workflow. Be curious about the fulll stack trace you see if you do try this with s3a://, even though it'll still be a WONTFIX. We could at least go for a more meaningful exception translation, and the retry logic needs to know that it won't go away if you try again > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140408#comment-16140408 ] Steve Loughran commented on SPARK-21797: This is happening deep the Amazon EMR team's closed source {{EmrFileSystem}}, so nothing anyone here at the ASF can deal with directly; I'm confident S3A will handle it pretty similarly though, either in the open() call or shortly afterwards, in the first read(). All we could do there is convert to a more meaningful error, or actually check to see if the file is valid at open() time & again, fail meaningfully At the Spark level, it's because Parquet is trying to read the footer of every file in parallel the good news, you can tell Spark to ignore files it can't read. I believe this might be a quick workaround: {code} spark.sql.files.ignoreCorruptFiles=true {code} Let us know what happens > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-21797: --- Environment: Amazon EMR > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Amazon EMR >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140408#comment-16140408 ] Steve Loughran edited comment on SPARK-21797 at 8/24/17 5:56 PM: - This is happening deep in the Amazon EMR team's closed source {{EmrFileSystem}}, so nothing anyone here at the ASF can deal with directly; I'm confident S3A will handle it pretty similarly though, either in the open() call or shortly afterwards, in the first read(). All we could do there is convert to a more meaningful error, or actually check to see if the file is valid at open() time & again, fail meaningfully At the Spark level, it's because Parquet is trying to read the footer of every file in parallel the good news, you can tell Spark to ignore files it can't read. I believe this might be a quick workaround: {code} spark.sql.files.ignoreCorruptFiles=true {code} Let us know what happens was (Author: ste...@apache.org): This is happening deep the Amazon EMR team's closed source {{EmrFileSystem}}, so nothing anyone here at the ASF can deal with directly; I'm confident S3A will handle it pretty similarly though, either in the open() call or shortly afterwards, in the first read(). All we could do there is convert to a more meaningful error, or actually check to see if the file is valid at open() time & again, fail meaningfully At the Spark level, it's because Parquet is trying to read the footer of every file in parallel the good news, you can tell Spark to ignore files it can't read. I believe this might be a quick workaround: {code} spark.sql.files.ignoreCorruptFiles=true {code} Let us know what happens > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Amazon EMR >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21797) spark cannot read partitioned data in S3 that are partly in glacier
[ https://issues.apache.org/jira/browse/SPARK-21797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141483#comment-16141483 ] Steve Loughran commented on SPARK-21797: bq. According to our test, it is 20% slower maximum to read parquet data from S3 than HDFS. Do you agree? You are testing the amazon EMR client. try with the Hadoop 2.8 JARs and the s3a client, enable columnar store optimised seek with spark.hadoop.fs.s3a.experimental.fadvise=random & see how things compare then. You will still be at a disadvantage with any directory scanning/walking which can take seconds rather than millis, and seeks are still expensive as you have to issue new HTTP requests with different content ranges. And of course, AWS throttles your VMs and shard-specific access to subtrees of a single bucket. HDFS locally still wins > spark cannot read partitioned data in S3 that are partly in glacier > --- > > Key: SPARK-21797 > URL: https://issues.apache.org/jira/browse/SPARK-21797 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Amazon EMR >Reporter: Boris Clémençon > Labels: glacier, partitions, read, s3 > > I have a dataset in parquet in S3 partitioned by date (dt) with oldest date > stored in AWS Glacier to save some money. For instance, we have... > {noformat} > s3://my-bucket/my-dataset/dt=2017-07-01/[in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-09/[in glacier] > s3://my-bucket/my-dataset/dt=2017-07-10/[not in glacier] > ... > s3://my-bucket/my-dataset/dt=2017-07-24/[not in glacier] > {noformat} > I want to read this dataset, but only a subset of date that are not yet in > glacier, eg: > {code:java} > val from = "2017-07-15" > val to = "2017-08-24" > val path = "s3://my-bucket/my-dataset/" > val X = spark.read.parquet(path).where(col("dt").between(from, to)) > {code} > Unfortunately, I have the exception > {noformat} > java.io.IOException: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > The operation is not valid for the object's storage class (Service: Amazon > S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: > C444D508B6042138) > {noformat} > I seems that spark does not like partitioned dataset when some partitions are > in Glacier. I could always read specifically each date, add the column with > current date and reduce(_ union _) at the end, but not pretty and it should > not be necessary. > Is there any tip to read available data in the datastore even with old data > in glacier? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21762) FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible
[ https://issues.apache.org/jira/browse/SPARK-21762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Loughran updated SPARK-21762: --- Summary: FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible (was: FileFormatWriter metrics collection fails if a newly close()d file isn't yet visible) > FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new > file isn't yet visible > > > Key: SPARK-21762 > URL: https://issues.apache.org/jira/browse/SPARK-21762 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: object stores without complete creation consistency > (this includes AWS S3's caching of negative GET results) >Reporter: Steve Loughran >Priority: Minor > > The metrics collection of SPARK-20703 can trigger premature failure if the > newly written object isn't actually visible yet, that is if, after > {{writer.close()}}, a {{getFileStatus(path)}} returns a > {{FileNotFoundException}}. > Strictly speaking, not having a file immediately visible goes against the > fundamental expectations of the Hadoop FS APIs, namely full consistent data & > medata across all operations, with immediate global visibility of all > changes. However, not all object stores make that guarantee, be it only newly > created data or updated blobs. And so spurious FNFEs can get raised, ones > which *should* have gone away by the time the actual task is committed. Or if > they haven't, the job is in such deep trouble. > What to do? > # leave as is: fail fast & so catch blobstores/blobstore clients which don't > behave as required. One issue here: will that trigger retries, what happens > there, etc, etc. > # Swallow the FNFE and hope the file is observable later. > # Swallow all IOEs and hope that whatever problem the FS has is transient. > Options 2 & 3 aren't going to collect metrics in the event of a FNFE, or at > least, not the counter of bytes written. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21762) FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible
[ https://issues.apache.org/jira/browse/SPARK-21762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131190#comment-16131190 ] Steve Loughran edited comment on SPARK-21762 at 8/17/17 7:41 PM: - SPARK-21669 simplifies this, especially testing, as it's isolated from FileFormatWriter. Same problem exists though: if you are getting any Create inconsistency, metrics probes trigger failures which may not be present by the time task commit actually takes place was (Author: ste...@apache.org): SPARK-20703 simplifies this, especially testing, as it's isolated from FileFormatWriter. Same problem exists though: if you are getting any Create inconsistency, metrics probes trigger failures which may not be present by the time task commit actually takes place > FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new > file isn't yet visible > > > Key: SPARK-21762 > URL: https://issues.apache.org/jira/browse/SPARK-21762 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: object stores without complete creation consistency > (this includes AWS S3's caching of negative GET results) >Reporter: Steve Loughran >Priority: Minor > > The metrics collection of SPARK-20703 can trigger premature failure if the > newly written object isn't actually visible yet, that is if, after > {{writer.close()}}, a {{getFileStatus(path)}} returns a > {{FileNotFoundException}}. > Strictly speaking, not having a file immediately visible goes against the > fundamental expectations of the Hadoop FS APIs, namely full consistent data & > medata across all operations, with immediate global visibility of all > changes. However, not all object stores make that guarantee, be it only newly > created data or updated blobs. And so spurious FNFEs can get raised, ones > which *should* have gone away by the time the actual task is committed. Or if > they haven't, the job is in such deep trouble. > What to do? > # leave as is: fail fast & so catch blobstores/blobstore clients which don't > behave as required. One issue here: will that trigger retries, what happens > there, etc, etc. > # Swallow the FNFE and hope the file is observable later. > # Swallow all IOEs and hope that whatever problem the FS has is transient. > Options 2 & 3 aren't going to collect metrics in the event of a FNFE, or at > least, not the counter of bytes written. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21762) FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible
[ https://issues.apache.org/jira/browse/SPARK-21762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131190#comment-16131190 ] Steve Loughran commented on SPARK-21762: SPARK-20703 simplifies this, especially testing, as it's isolated from FileFormatWriter. Same problem exists though: if you are getting any Create inconsistency, metrics probes trigger failures which may not be present by the time task commit actually takes place > FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new > file isn't yet visible > > > Key: SPARK-21762 > URL: https://issues.apache.org/jira/browse/SPARK-21762 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: object stores without complete creation consistency > (this includes AWS S3's caching of negative GET results) >Reporter: Steve Loughran >Priority: Minor > > The metrics collection of SPARK-20703 can trigger premature failure if the > newly written object isn't actually visible yet, that is if, after > {{writer.close()}}, a {{getFileStatus(path)}} returns a > {{FileNotFoundException}}. > Strictly speaking, not having a file immediately visible goes against the > fundamental expectations of the Hadoop FS APIs, namely full consistent data & > medata across all operations, with immediate global visibility of all > changes. However, not all object stores make that guarantee, be it only newly > created data or updated blobs. And so spurious FNFEs can get raised, ones > which *should* have gone away by the time the actual task is committed. Or if > they haven't, the job is in such deep trouble. > What to do? > # leave as is: fail fast & so catch blobstores/blobstore clients which don't > behave as required. One issue here: will that trigger retries, what happens > there, etc, etc. > # Swallow the FNFE and hope the file is observable later. > # Swallow all IOEs and hope that whatever problem the FS has is transient. > Options 2 & 3 aren't going to collect metrics in the event of a FNFE, or at > least, not the counter of bytes written. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed
[ https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200367#comment-16200367 ] Steve Loughran commented on SPARK-22240: Amazon EMR is amazon's own fork of Spark & Hadoop, with their own s3 connectors: They explicitly say don't use s3a|http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-file-systems.html]. I was about to deny there was any problem with Spark & S3a, but after looking into things more, I think that HADOOP-14943 means that S3A is returning the filesize as its single partition for a file, when really it should be splitting it up. As a result, the partitioning is going to be limited by that set in {{SparkContext.defaultMinPartitions}} unless you pass in a partition number to your {{SparkContext.hadoopRDD}} calls. I'd do that until I can fix things in S3A. > S3 CSV number of partitions incorrectly computed > > > Key: SPARK-22240 > URL: https://issues.apache.org/jira/browse/SPARK-22240 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0 >Reporter: Arthur Baudry > > Reading CSV out of S3 using S3A protocol does not compute the number of > partitions correctly in Spark 2.2.0. > With Spark 2.2.0 I get only partition when loading a 14GB file > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 1 > {code} > While in Spark 2.0.2 I had: > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 115 > {code} > This introduces obvious performance issues in Spark 2.2.0. Maybe there is a > property that should be set to have the number of partitions computed > correctly. > I'm aware that the .option("multiline","true") is not supported in Spark > 2.0.2, it's not relevant here. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed
[ https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200485#comment-16200485 ] Steve Loughran commented on SPARK-22240: What's the link to the multiline JIRA? As that could explain why nobody else has reported it. > S3 CSV number of partitions incorrectly computed > > > Key: SPARK-22240 > URL: https://issues.apache.org/jira/browse/SPARK-22240 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0 >Reporter: Arthur Baudry > > Reading CSV out of S3 using S3A protocol does not compute the number of > partitions correctly in Spark 2.2.0. > With Spark 2.2.0 I get only partition when loading a 14GB file > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 1 > {code} > While in Spark 2.0.2 I had: > {code:java} > scala> val input = spark.read.format("csv").option("header", > "true").option("delimiter", "|").option("multiLine", > "true").load("s3a://") > input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: > string ... 36 more fields] > scala> input.rdd.getNumPartitions > res2: Int = 115 > {code} > This introduces obvious performance issues in Spark 2.2.0. Maybe there is a > property that should be set to have the number of partitions computed > correctly. > I'm aware that the .option("multiline","true") is not supported in Spark > 2.0.2, it's not relevant here. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9103) Tracking spark's memory usage
[ https://issues.apache.org/jira/browse/SPARK-9103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182372#comment-16182372 ] Steve Loughran commented on SPARK-9103: --- If it helps, most uses of ByteBuffer in hadoop core & HDFS pool their buffers through {{org.apache.hadoop.io.ByteBufferPool}} and {{org.apache.hadoop.util.DirectBufferPool}} ...if some instrumentation could be added there just to measure pool size, and pools were created with an owner name, then reporting could help apportion blame. Rather than just say "512MB were bytebuffers", it could say "DFS Client used 189MB; CryptoInputStream 32MB, etc. > Tracking spark's memory usage > - > > Key: SPARK-9103 > URL: https://issues.apache.org/jira/browse/SPARK-9103 > Project: Spark > Issue Type: Umbrella > Components: Spark Core, Web UI >Reporter: Zhang, Liye > Attachments: Tracking Spark Memory Usage - Phase 1.pdf > > > Currently spark only provides little memory usage information (RDD cache on > webUI) for the executors. User have no idea on what is the memory consumption > when they are running spark applications with a lot of memory used in spark > executors. Especially when they encounter the OOM, it’s really hard to know > what is the cause of the problem. So it would be helpful to give out the > detail memory consumption information for each part of spark, so that user > can clearly have a picture of where the memory is exactly used. > The memory usage info to expose should include but not limited to shuffle, > cache, network, serializer, etc. > User can optionally choose to open this functionality since this is mainly > for debugging and tuning. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22587) Spark job fails if fs.defaultFS and application jar are different url
[ https://issues.apache.org/jira/browse/SPARK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266697#comment-16266697 ] Steve Loughran commented on SPARK-22587: See also [FileSystem.CACHE.Key.isEqual()|https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3530] is how the filesystem cache compares things (scheme + auth + UGI of owner of the FS). FileSystem doesn't implement it's own .equals operator, nor do any subclasses: they rely on Object.equals. So if you compare that then the cache will return the same FS instance for both (assuming the conf didn't disable caching). You could probably implement a reliable check as follows {code} try { srcFs.makeQualified(destFs.getWorkingDir( return true; } catch { case IllegalArgumentException => return false; } {code} Not ideal, as the false response is expensive, but given if the two filesystems are different the client will copy the file, then the cost of raising an exception is lost in the noise > Spark job fails if fs.defaultFS and application jar are different url > - > > Key: SPARK-22587 > URL: https://issues.apache.org/jira/browse/SPARK-22587 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.6.3 >Reporter: Prabhu Joseph > > Spark Job fails if the fs.defaultFs and url where application jar resides are > different and having same scheme, > spark-submit --conf spark.master=yarn-cluster wasb://XXX/tmp/test.py > core-site.xml fs.defaultFS is set to wasb:///YYY. Hadoop list works (hadoop > fs -ls) works for both the url XXX and YYY. > {code} > Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: > wasb://XXX/tmp/test.py, expected: wasb://YYY > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665) > at > org.apache.hadoop.fs.azure.NativeAzureFileSystem.checkPath(NativeAzureFileSystem.java:1251) > > at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:485) > at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:396) > at > org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:507) > > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:660) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:912) > > at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:172) > at org.apache.spark.deploy.yarn.Client.run(Client.scala:1248) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1307) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:751) > > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > The code Client.copyFileToRemote tries to resolve the path of application jar > (XXX) from the FileSystem object created using fs.defaultFS url (YYY) instead > of the actual url of application jar. > val destFs = destDir.getFileSystem(hadoopConf) > val srcFs = srcPath.getFileSystem(hadoopConf) > getFileSystem will create the filesystem based on the url of the path and so > this is fine. But the below lines of code tries to get the srcPath (XXX url) > from the destFs (YYY url) and so it fails. > var destPath = srcPath > val qualifiedDestPath = destFs.makeQualified(destPath) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22587) Spark job fails if fs.defaultFS and application jar are different url
[ https://issues.apache.org/jira/browse/SPARK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266674#comment-16266674 ] Steve Loughran commented on SPARK-22587: Jerry had already pulled me in for this; it's one of those little "pits of semantics" you can get pulled into, "the incident pit" as they call it in SCUBA. summary: You need a case insensitive check for schema and serInfo too, maybe even port. # Allow for null though. # an consider using {{FileSystem.makeQualified(path)}} as the safety check What does Hadoop get up to? * FileSystem.checkPath does a full check of (scheme, authority), with the auth of the canonicalized URI (including default ports) (so hdfs://namnode/ and hdfs://namenode:9820/ refer to the same FS. That code dates from 2008, so should be considered normative. * S3AFilesSystem.checkPath only looks at hostnames, because it tries to strip out user:password from Paths for security reasons * Wasb uses FileSystem.checkPath, but does some hacks to also handle an older scheme of "asv". I wouldn't worry about that little detail though * {{AbstractFileSystem.checkPath}} (the FileContext implementation code) doesn't check auth, it looks at host and mentions the fact that on a file:// reference the host may be null. Raises {{InvalidPathException}} (subclass of {{IllegalArgumentException}} if its unhappy. Overall then: check auth with an .equalsIgnoreCase(), allow for null. Worry about default ports if you really want to. Filed HADOOP-15070 to cover this whole area better in docs & tests, should make the FileContext/FileSystem checks consistent and raise the same InvalidPathException. One thing to consider is adding to the FS APIs some predicate {{isFileSystemPath(Path p)}} to do the validation without the overhead of exception throwing, and implement it in one single (consistent) place. Wouldn't be there until Hadoop 3.1 though, so not of any immediate benefit. thank you for bringing this undocumented, unspecified, untested and inconsistent logic to my attention :) > Spark job fails if fs.defaultFS and application jar are different url > - > > Key: SPARK-22587 > URL: https://issues.apache.org/jira/browse/SPARK-22587 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.6.3 >Reporter: Prabhu Joseph > > Spark Job fails if the fs.defaultFs and url where application jar resides are > different and having same scheme, > spark-submit --conf spark.master=yarn-cluster wasb://XXX/tmp/test.py > core-site.xml fs.defaultFS is set to wasb:///YYY. Hadoop list works (hadoop > fs -ls) works for both the url XXX and YYY. > {code} > Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: > wasb://XXX/tmp/test.py, expected: wasb://YYY > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665) > at > org.apache.hadoop.fs.azure.NativeAzureFileSystem.checkPath(NativeAzureFileSystem.java:1251) > > at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:485) > at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:396) > at > org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:507) > > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:660) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:912) > > at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:172) > at org.apache.spark.deploy.yarn.Client.run(Client.scala:1248) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1307) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:751) > > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > The code Client.copyFileToRemote tries to resolve the path of application jar > (XXX) from the FileSystem object created using fs.defaultFS url (YYY) instead > of the actual url of application jar. > val destFs = destDir.getFileSystem(hadoopConf) > val srcFs = srcPath.getFileSystem(hadoopConf) > getFileSystem will create the filesystem based on the url of the path and so > this is fine. But the
[jira] [Commented] (SPARK-22526) Document closing of PortableDataInputStream in binaryFiles
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267015#comment-16267015 ] Steve Loughran commented on SPARK-22526: HADOOP-15071 updates the s3a troubleshooting with what it means when your code hangs on many reads; this will the s3a-side of the doc changes. > Document closing of PortableDataInputStream in binaryFiles > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Improvement > Components: Documentation, Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265167#comment-16265167 ] Steve Loughran commented on SPARK-22526: it says it in the javadocs for [PortableInputStream.open()|: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala#L176]. I don't see how it could be made much clearer there. I think it's implicitly "obvious" that streams need closing, because its considered standard in all java/scala code. You either make sure the code you call does it (docs or diving into it through the IDE), or you do it yourself. As close() is required to be idempotent, you can call it on a closed stream, so you may as well. # There's nothing in {{SparkContext.binaryFiles()}}...you could maybe supply a mention there. # there's only one use of PortableInputStream.open() which I can see in the code" {{PortableInputStream.toArray()}}; it does do a close() in finally. Those test ones are more verifying path resolution, except for those which use toArray, which does, as noted, close things. I'm sorry you feel let down by the framework, but there's nothing which can be done in the code. At some point in the future someone could add something about binaryFiles to the markdown docs...perhaps, once you've used it more, you could be the volunteer? > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264095#comment-16264095 ] Steve Loughran commented on SPARK-22526: # Fix the code you invoke #. wrap the code you invoke with something like (and this is coded in the JIRA, untested & should really close the stream in something to swallow IOEs. {code} binaryRdd.map { t => try { process(t._2) } finally { t._2.close() } } {code} > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264095#comment-16264095 ] Steve Loughran edited comment on SPARK-22526 at 11/23/17 3:47 PM: -- # Fix the code you invoke # wrap the code you invoke with something like (and this is coded in the JIRA, untested & should really close the stream in something to swallow IOEs. {code} binaryRdd.map { t => try { process(t._2) } finally { t._2.close() } } {code} was (Author: ste...@apache.org): # Fix the code you invoke #. wrap the code you invoke with something like (and this is coded in the JIRA, untested & should really close the stream in something to swallow IOEs. {code} binaryRdd.map { t => try { process(t._2) } finally { t._2.close() } } {code} > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264496#comment-16264496 ] Steve Loughran commented on SPARK-22526: I'm not giving a permanent fix. It's a bug in your code or the code you are invoking "forgets to close input stream" Unless [~srowen] has other ideas, I'd recommend as closing as one of {WORKSFORME, WONTFIX or INVALID} > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22657) Hadoop fs implementation classes are not loaded if they are part of the app jar or other jar when --packages flag is used
[ https://issues.apache.org/jira/browse/SPARK-22657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272974#comment-16272974 ] Steve Loughran commented on SPARK-22657: Hadoop FileSystem service introspection for FS binding has turned out to be more pain than it was worth it; needs to be revisited, but for other reasons. Why not just set fs.s3n.impl=org.apache.hadoop.fs.s3native.NativeS3FileSystem and bypass the service loader. Better yet, switch to s3a which declares itself that way anyway on hadoop 2.8+. > Hadoop fs implementation classes are not loaded if they are part of the app > jar or other jar when --packages flag is used > -- > > Key: SPARK-22657 > URL: https://issues.apache.org/jira/browse/SPARK-22657 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Stavros Kontopoulos > > To reproduce this issue run: > ./bin/spark-submit --master mesos://leader.mesos:5050 \ > --packages com.github.scopt:scopt_2.11:3.5.0 \ > --conf spark.cores.max=8 \ > --conf > spark.mesos.executor.docker.image=mesosphere/spark:beta-2.1.1-2.2.0-2-hadoop-2.6 > \ > --conf spark.mesos.executor.docker.forcePullImage=true \ > --class S3Job > http://s3-us-west-2.amazonaws.com/arand-sandbox-mesosphere/dcos-spark-scala-tests-assembly-0.1-SNAPSHOT.jar > \ > --readUrl s3n://arand-sandbox-mesosphere/big.txt --writeUrl > s3n://arand-sandbox-mesosphere/linecount.out > within a container created with > mesosphere/spark:beta-2.1.1-2.2.0-2-hadoop-2.6 image > You will get: "Exception in thread "main" java.io.IOException: No FileSystem > for scheme: s3n" > This can be run reproduced with local[*] as well, no need to use mesos, this > is not mesos bug. > The specific spark job used above can be found here: > https://github.com/mesosphere/spark-build/blob/d5c50e9ae3b1438e0c4ba96ff9f36d5dafb6a466/tests/jobs/scala/src/main/scala/S3Job.scala > > Can be built with sbt assembly in that dir. > Using this code : > https://gist.github.com/skonto/4f5ff1e5ede864f90b323cc20bf1e1cbat the > beginning of the main method... > you get the following output : > https://gist.github.com/skonto/d22b8431586b6663ddd720e179030da4 > (Use > http://s3-eu-west-1.amazonaws.com/fdp-stavros-test/dcos-spark-scala-tests-assembly-0.1-SNAPSHOT.jar > to to get the modified job) > The job works fine if --packages is not used. > The commit that introduced this issue is (before that things work as > expected): > 5800144a54f5c0180ccf67392f32c3e8a51119b1[m -[33m[m [SPARK-21012][SUBMIT] Add > glob support for resources adding to Spark [32m(5 months ago) > [1;34m[m Thu, 6 Jul 2017 15:32:49 +0800 > The exception comes from here: > https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3311 > https://github.com/apache/spark/pull/18235/files, check line 950, this is > where a filesystem is first created. > The Filesystem class is initialized there, before the main of the spark job > is launched... the reason is --packages logic uses hadoop libraries to > download files > Maven resolution happens before the app jar and the resolved jars are added > to the classpath. So at that moment there is no s3n to add to the static map > when the Filesystem static members are first initialized and also filled due > to the first FileSystem instance created (SERVICE_FILE_SYSTEMS). > Later in the spark job main where we try to access the s3n filesystem (create > a second filesystem) we get the exception (at this point the app jar has the > s3n implementation in it and its on the class path but that scheme is not > loaded in the static map of the Filesystem class)... > hadoopConf.set("fs.s3n.impl.disable.cache", "true") has no effect since the > problem is with the static map which is filled once and only once. > That's why we see two prints of the map contents in the output(gist) above > when --packages is used. The first print is before creating the s3n > filesystem. We use reflection there to get the static map's entries. When > --packages is not used that map is empty before creating the s3n filesystem > since up to that point the Filesystem class is not yet loaded by the > classloader. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22657) Hadoop fs implementation classes are not loaded if they are part of the app jar or other jar when --packages flag is used
[ https://issues.apache.org/jira/browse/SPARK-22657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16273191#comment-16273191 ] Steve Loughran commented on SPARK-22657: if you look at HADOOP-14138 you can see why we cut s3a from the list; HADOOP-14132 how we culled the references. Not only brittle, too much overhead in classloading, especially in shading. I think we could switch to something more minimal, but even there hadn't planned to reset the enum on classloader changes. Happy to add a static method to trigger that though. Indeed, if you submit the patch for HADOOP-14132, with tests, you can make sure it suits your needs > Hadoop fs implementation classes are not loaded if they are part of the app > jar or other jar when --packages flag is used > -- > > Key: SPARK-22657 > URL: https://issues.apache.org/jira/browse/SPARK-22657 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Stavros Kontopoulos > > To reproduce this issue run: > ./bin/spark-submit --master mesos://leader.mesos:5050 \ > --packages com.github.scopt:scopt_2.11:3.5.0 \ > --conf spark.cores.max=8 \ > --conf > spark.mesos.executor.docker.image=mesosphere/spark:beta-2.1.1-2.2.0-2-hadoop-2.6 > \ > --conf spark.mesos.executor.docker.forcePullImage=true \ > --class S3Job > http://s3-us-west-2.amazonaws.com/arand-sandbox-mesosphere/dcos-spark-scala-tests-assembly-0.1-SNAPSHOT.jar > \ > --readUrl s3n://arand-sandbox-mesosphere/big.txt --writeUrl > s3n://arand-sandbox-mesosphere/linecount.out > within a container created with > mesosphere/spark:beta-2.1.1-2.2.0-2-hadoop-2.6 image > You will get: "Exception in thread "main" java.io.IOException: No FileSystem > for scheme: s3n" > This can be run reproduced with local[*] as well, no need to use mesos, this > is not mesos bug. > The specific spark job used above can be found here: > https://github.com/mesosphere/spark-build/blob/d5c50e9ae3b1438e0c4ba96ff9f36d5dafb6a466/tests/jobs/scala/src/main/scala/S3Job.scala > > Can be built with sbt assembly in that dir. > Using this code : > https://gist.github.com/skonto/4f5ff1e5ede864f90b323cc20bf1e1cbat the > beginning of the main method... > you get the following output : > https://gist.github.com/skonto/d22b8431586b6663ddd720e179030da4 > (Use > http://s3-eu-west-1.amazonaws.com/fdp-stavros-test/dcos-spark-scala-tests-assembly-0.1-SNAPSHOT.jar > to to get the modified job) > The job works fine if --packages is not used. > The commit that introduced this issue is (before that things work as > expected): > 5800144a54f5c0180ccf67392f32c3e8a51119b1[m -[33m[m [SPARK-21012][SUBMIT] Add > glob support for resources adding to Spark [32m(5 months ago) > [1;34m[m Thu, 6 Jul 2017 15:32:49 +0800 > The exception comes from here: > https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3311 > https://github.com/apache/spark/pull/18235/files, check line 950, this is > where a filesystem is first created. > The Filesystem class is initialized there, before the main of the spark job > is launched... the reason is --packages logic uses hadoop libraries to > download files > Maven resolution happens before the app jar and the resolved jars are added > to the classpath. So at that moment there is no s3n to add to the static map > when the Filesystem static members are first initialized and also filled due > to the first FileSystem instance created (SERVICE_FILE_SYSTEMS). > Later in the spark job main where we try to access the s3n filesystem (create > a second filesystem) we get the exception (at this point the app jar has the > s3n implementation in it and its on the class path but that scheme is not > loaded in the static map of the Filesystem class)... > hadoopConf.set("fs.s3n.impl.disable.cache", "true") has no effect since the > problem is with the static map which is filled once and only once. > That's why we see two prints of the map contents in the output(gist) above > when --packages is used. The first print is before creating the s3n > filesystem. We use reflection there to get the static map's entries. When > --packages is not used that map is empty before creating the s3n filesystem > since up to that point the Filesystem class is not yet loaded by the > classloader. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263021#comment-16263021 ] Steve Loughran commented on SPARK-22526: If the input stream doesn't get closed, there probably is going to be a GET kept open on every request. So I'd make sure that whatever you are running in your RDD does this. As the docs of {{PortableDataStream.open()}} say "The user of this method is responsible for closing the stream after usage." > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18294) Implement commit protocol to support `mapred` package's committer
[ https://issues.apache.org/jira/browse/SPARK-18294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292595#comment-16292595 ] Steve Loughran commented on SPARK-18294: Following up on this, one question: Why support the older mapred protocol? The standard impl in Hadoop just relays to the new stuff, it just complicates everyones life as there's two test paths, APIs to document, risk of different failure modes. The v1 API isn't being actively developed, and really its time to move off it > Implement commit protocol to support `mapred` package's committer > - > > Key: SPARK-18294 > URL: https://issues.apache.org/jira/browse/SPARK-18294 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Jiang Xingbo >Assignee: Jiang Xingbo > Fix For: 2.3.0 > > > Current `FileCommitProtocol` is based on `mapreduce` package, we should > implement a `HadoopMapRedCommitProtocol` that supports the older mapred > package's commiter. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14959) Problem Reading partitioned ORC or Parquet files
[ https://issues.apache.org/jira/browse/SPARK-14959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256789#comment-16256789 ] Steve Loughran commented on SPARK-14959: Came across a reference to this while scanning for getFileBlockLocations() use. HDFS shouldn't be throwing this. {{getFileBlockLocations(Path, offset, len)}} is nominally the same as {{getFileBlockLocations(getFileStatus(Path), offset, len)}}; the latter will return an empty array on a directory. Looks like the HDFS behaviour has been there for years, and people can argue that it's the correct behaviour: but its the only subclass of the base FileSystem implementation, and it doesn't fail on a directory. Maybe it can be fixed, at the very least the behaviour needs to be specified explicitly. > Problem Reading partitioned ORC or Parquet files > - > > Key: SPARK-14959 > URL: https://issues.apache.org/jira/browse/SPARK-14959 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: Hadoop 2.7.1.2.4.0.0-169 (HDP 2.4) >Reporter: Sebastian YEPES FERNANDEZ >Assignee: Xin Wu >Priority: Blocker > Fix For: 2.0.0 > > > Hello, > I have noticed that in the pasts days there is an issue when trying to read > partitioned files from HDFS. > I am running on Spark master branch #c544356 > The write actually works but the read fails. > {code:title=Issue Reproduction} > case class Data(id: Int, text: String) > val ds = spark.createDataset( Seq(Data(0, "hello"), Data(1, "hello"), Data(0, > "world"), Data(1, "there")) ) > scala> > ds.write.mode(org.apache.spark.sql.SaveMode.Overwrite).format("parquet").partitionBy("id").save("/user/spark/test.parquet") > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > java.io.FileNotFoundException: Path is not a file: > /user/spark/test.parquet/id=0 > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:652) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2151) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2147) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2145) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1242) > at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227) > at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1285) > at > org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:221) > at > org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:217) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at >
[jira] [Commented] (SPARK-16996) Hive ACID delta files not seen
[ https://issues.apache.org/jira/browse/SPARK-16996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16255329#comment-16255329 ] Steve Loughran commented on SPARK-16996: [~maver1ck] Spark hive is custom as it was modified to sort classpath consistencies between things (Kryo, Spark, Hive, something else). You can't mix them without seeing a stack trace somewhere...the main variable is "where", not "whether" Given this is HDP, not the ASF binaries; I'd take it up via the support process there, starting with the [online forums|https://community.hortonworks.com/index.html] > Hive ACID delta files not seen > -- > > Key: SPARK-16996 > URL: https://issues.apache.org/jira/browse/SPARK-16996 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.2, 1.6.3, 2.1.2, 2.2.0 > Environment: Hive 1.2.1, Spark 1.5.2 >Reporter: Benjamin BONNET >Priority: Critical > > spark-sql seems not to see data stored as delta files in an ACID Hive table. > Actually I encountered the same problem as describe here : > http://stackoverflow.com/questions/35955666/spark-sql-is-not-returning-records-for-hive-transactional-tables-on-hdp > For example, create an ACID table with HiveCLI and insert a row : > {code} > set hive.support.concurrency=true; > set hive.enforce.bucketing=true; > set hive.exec.dynamic.partition.mode=nonstrict; > set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; > set hive.compactor.initiator.on=true; > set hive.compactor.worker.threads=1; > CREATE TABLE deltas(cle string,valeur string) CLUSTERED BY (cle) INTO 1 > BUCKETS > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' > STORED AS > INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' > OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > TBLPROPERTIES ('transactional'='true'); > INSERT INTO deltas VALUES("a","a"); > {code} > Then make a query with spark-sql CLI : > {code} > SELECT * FROM deltas; > {code} > That query gets no result and there are no errors in logs. > If you go to HDFS to inspect table files, you find only deltas > {code} > ~>hdfs dfs -ls /apps/hive/warehouse/deltas > Found 1 items > drwxr-x--- - me hdfs 0 2016-08-10 14:03 > /apps/hive/warehouse/deltas/delta_0020943_0020943 > {code} > Then if you run compaction on that table (in HiveCLI) : > {code} > ALTER TABLE deltas COMPACT 'MAJOR'; > {code} > As a result, the delta will be compute into a base file : > {code} > ~>hdfs dfs -ls /apps/hive/warehouse/deltas > Found 1 items > drwxrwxrwx - me hdfs 0 2016-08-10 15:25 > /apps/hive/warehouse/deltas/base_0020943 > {code} > Go back to spark-sql and the same query gets a result : > {code} > SELECT * FROM deltas; > a a > Time taken: 0.477 seconds, Fetched 1 row(s) > {code} > But next time you make an insert into Hive table : > {code} > INSERT INTO deltas VALUES("b","b"); > {code} > spark-sql will immediately see changes : > {code} > SELECT * FROM deltas; > a a > b b > Time taken: 0.122 seconds, Fetched 2 row(s) > {code} > Yet there was no other compaction, but spark-sql "sees" the base AND the > delta file : > {code} > ~> hdfs dfs -ls /apps/hive/warehouse/deltas > Found 2 items > drwxrwxrwx - valdata hdfs 0 2016-08-10 15:25 > /apps/hive/warehouse/deltas/base_0020943 > drwxr-x--- - valdata hdfs 0 2016-08-10 15:31 > /apps/hive/warehouse/deltas/delta_0020956_0020956 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17593) list files on s3 very slow
[ https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247886#comment-16247886 ] Steve Loughran commented on SPARK-17593: Hey nick, yes, need to move to FileSystem.list(path, recursive=true) & then iterate through the results. This actually scales better to the many thousands of files, but you'd know that. Not done a patch for that myself. Moving to that won't be any worse for spark on older hadoop versions, but will get the read time speedup on Hadoop 2.8+. Write performance a separate issue, which is really "commit algorithms for blob storage". > list files on s3 very slow > -- > > Key: SPARK-17593 > URL: https://issues.apache.org/jira/browse/SPARK-17593 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.0 > Environment: spark 2.0.0, hadoop 2.7.2 ( hadoop 2.7.3) >Reporter: Gaurav Shah >Priority: Minor > > lets say we have following partitioned data: > {code} > events_v3 > -- event_date=2015-01-01 > event_hour=0 > -- verb=follow > part1.parquet.gz > event_hour=1 > -- verb=click > part1.parquet.gz > -- event_date=2015-01-02 > event_hour=5 > -- verb=follow > part1.parquet.gz > event_hour=10 > -- verb=click > part1.parquet.gz > {code} > To read (or write ) parquet partitioned data via spark it makes call to > `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all > files and folders. > In this case if we had 300 dates, we would have created 300 jobs each trying > to get filelist from date_directory. This process takes about 10 minutes to > finish ( with 2 executors). vs if I use a ruby script to get list of all > files recursively in the same folder it takes about 1 minute, on the same > machine with just 1 thread. > I am confused as to why it would take so much time extra for listing files. > spark code: > {code:scala} > val sparkSession = org.apache.spark.sql.SparkSession.builder > .config("spark.sql.hive.metastorePartitionPruning",true) > .config("spark.sql.parquet.filterPushdown", true) > .config("spark.sql.hive.verifyPartitionPath", false) > .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) > .config("parquet.enable.summary-metadata",false) > .config("spark.sql.sources.partitionDiscovery.enabled",false) > .getOrCreate() > val df = > sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") > df.createOrReplaceTempView("temp_events") > sparkSession.sql( > """ > |select verb,count(*) from temp_events where event_date = > "2016-08-05" group by verb > """.stripMargin).show() > {code} > ruby code: > {code:ruby} > gem 'aws-sdk', '~> 2' > require 'aws-sdk' > client = Aws::S3::Client.new(:region=>'us-west-1') > next_continuation_token = nil > total = 0 > loop do > a= client.list_objects_v2({ > bucket: "bucket", # required > max_keys: 1000, > prefix: "events_v3/", > continuation_token: next_continuation_token , > fetch_owner: false, > }) > puts a.contents.last.key > total += a.contents.size > next_continuation_token = a.next_continuation_token > break unless a.is_truncated > end > puts "total" > puts total > {code} > tried looking into following bug: > https://issues.apache.org/jira/browse/HADOOP-12810 > but hadoop 2.7.3 doesn't solve that problem > stackoverflow reference: > http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org