[jira] [Commented] (SPARK-7721) Generate test coverage report from Python
[ https://issues.apache.org/jira/browse/SPARK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16263944#comment-16263944 ] Hyukjin Kwon commented on SPARK-7721: - [~joshrosen], ahh, I happened to duplicate the efforts here before .. So, seems Jenkins <> Codecov is declined for now? Probably one easy workaround is just to use github pages - https://pages.github.com/. What we need would probably just push the changes into a repo if the tests pass, which will automatically updates its page. I did this before to demonstrate SQL function docs: https://spark-test.github.io/sparksqldoc/ https://github.com/spark-test/sparksqldoc FWIW, I recently added {{spark.python.use.daemon}} config like SparkR to disable os.fork and this (of course) enables tracking worker processes, although of course we should not disable it in Jenkins tests as it's extremely slow. It was good enough for small tests to verify PR or changes though. > Generate test coverage report from Python > - > > Key: SPARK-7721 > URL: https://issues.apache.org/jira/browse/SPARK-7721 > Project: Spark > Issue Type: Test > Components: PySpark, Tests >Reporter: Reynold Xin > > Would be great to have test coverage report for Python. Compared with Scala, > it is tricker to understand the coverage without coverage reports in Python > because we employ both docstring tests and unit tests in test files. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22585) Url encoding of jar path expected?
[ https://issues.apache.org/jira/browse/SPARK-22585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16263979#comment-16263979 ] Jakub Dubovsky commented on SPARK-22585: Real file path is exactly the same as one I am passing into add jar. It contains "%3A443". > Url encoding of jar path expected? > -- > > Key: SPARK-22585 > URL: https://issues.apache.org/jira/browse/SPARK-22585 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Jakub Dubovsky > > I am calling {code}sparkContext.addJar{code} method with path to a local jar > I want to add. Example: > {code}/home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar{code}. > As a result I get an exception saying > {code} > Failed to add > /home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar to Spark > environment. Stacktrace: > java.io.FileNotFoundException: Jar > /home/me/.coursier/cache/v1/https/artifactory.com:443/path/to.jar not found > {code} > Important part to notice here is that colon character is url encoded in path > I want to use but exception is complaining about path in decoded form. This > is caused by this line of code from implementation ([see > here|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L1833]): > {code} > case null | "file" => addJarFile(new File(uri.getPath)) > {code} > It uses > [getPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getPath()] > method of > [java.net.URI|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html] > which url decodes the path. I believe method > [getRawPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getRawPath()] > should be used here which keeps path string in original form. > I tend to see this as a bug since I want to use my dependencies resolved from > artifactory with port directly. Is there some specific reason for this or can > we fix this? > Thanks -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264095#comment-16264095 ] Steve Loughran commented on SPARK-22526: # Fix the code you invoke #. wrap the code you invoke with something like (and this is coded in the JIRA, untested & should really close the stream in something to swallow IOEs. {code} binaryRdd.map { t => try { process(t._2) } finally { t._2.close() } } {code} > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22588) SPARK: Load Data from Dataframe or RDD to DynamoDB / dealing with null values
[ https://issues.apache.org/jira/browse/SPARK-22588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-22588. --- Resolution: Invalid A question should go to the mailing list. Here it's not clear that it's not just a problem in your code, or in the Dynamo integration. You don't say where the error occurs. > SPARK: Load Data from Dataframe or RDD to DynamoDB / dealing with null values > - > > Key: SPARK-22588 > URL: https://issues.apache.org/jira/browse/SPARK-22588 > Project: Spark > Issue Type: Question > Components: Deploy >Affects Versions: 2.1.1 >Reporter: Saanvi Sharma >Priority: Minor > Labels: dynamodb, spark > Original Estimate: 24h > Remaining Estimate: 24h > > I am using spark 2.1 on EMR and i have a dataframe like this: > ClientNum | Value_1 | Value_2 | Value_3 | Value_4 > 14 |A |B| C | null > 19 |X |Y| null| null > 21 |R | null | null| null > I want to load data into DynamoDB table with ClientNum as key fetching: > Analyze Your Data on Amazon DynamoDB with apche Spark11 > Using Spark SQL for ETL3 > here is my code that I tried to solve: > var jobConf = new JobConf(sc.hadoopConfiguration) > jobConf.set("dynamodb.servicename", "dynamodb") > jobConf.set("dynamodb.input.tableName", "table_name") > jobConf.set("dynamodb.output.tableName", "table_name") > jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com") > jobConf.set("dynamodb.regionid", "eu-west-1") > jobConf.set("dynamodb.throughput.read", "1") > jobConf.set("dynamodb.throughput.read.percent", "1") > jobConf.set("dynamodb.throughput.write", "1") > jobConf.set("dynamodb.throughput.write.percent", "1") > > jobConf.set("mapred.output.format.class", > "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") > jobConf.set("mapred.input.format.class", > "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") > #Import Data > val df = > sqlContext.read.format("com.databricks.spark.csv").option("header", > "true").option("inferSchema", "true").load(path) > I performed a transformation to have an RDD that matches the types that the > DynamoDB custom output format knows how to write. The custom output format > expects a tuple containing the Text and DynamoDBItemWritable types. > Create a new RDD with those types in it, in the following map call: > #Convert the dataframe to rdd > val df_rdd = df.rdd > > df_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = > MapPartitionsRDD[10] at rdd at :41 > > #Print first rdd > df_rdd.take(1) > > res12: Array[org.apache.spark.sql.Row] = Array([14,A,B,C,null]) > var ddbInsertFormattedRDD = df_rdd.map(a => { > var ddbMap = new HashMap[String, AttributeValue]() > var ClientNum = new AttributeValue() > ClientNum.setN(a.get(0).toString) > ddbMap.put("ClientNum", ClientNum) > var Value_1 = new AttributeValue() > Value_1.setS(a.get(1).toString) > ddbMap.put("Value_1", Value_1) > var Value_2 = new AttributeValue() > Value_2.setS(a.get(2).toString) > ddbMap.put("Value_2", Value_2) > var Value_3 = new AttributeValue() > Value_3.setS(a.get(3).toString) > ddbMap.put("Value_3", Value_3) > var Value_4 = new AttributeValue() > Value_4.setS(a.get(4).toString) > ddbMap.put("Value_4", Value_4) > var item = new DynamoDBItemWritable() > item.setItem(ddbMap) > (new Text(""), item) > }) > This last call uses the job configuration that defines the EMR-DDB connector > to write out the new RDD you created in the expected format: > ddbInsertFormattedRDD.saveAsHadoopDataset(jobConf) > fails with the follwoing error: > Caused by: java.lang.NullPointerException > null values caused the error, if I try with ClientNum and Value_1 it works > data is correctly inserted on DynamoDB table. > Thanks for your help !! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22393) spark-shell can't find imported types in class constructors, extends clause
[ https://issues.apache.org/jira/browse/SPARK-22393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264102#comment-16264102 ] Mark Petruska commented on SPARK-22393: --- The problem is also reproducible with non-spark related classes. Example: {code} import java.util.UUID class U(u: UUID) ... error: not found: type UUID ... {code} > spark-shell can't find imported types in class constructors, extends clause > --- > > Key: SPARK-22393 > URL: https://issues.apache.org/jira/browse/SPARK-22393 > Project: Spark > Issue Type: Bug > Components: Spark Shell >Affects Versions: 2.0.2, 2.1.2, 2.2.0 >Reporter: Ryan Williams >Priority: Minor > > {code} > $ spark-shell > … > scala> import org.apache.spark.Partition > import org.apache.spark.Partition > scala> class P(p: Partition) > :11: error: not found: type Partition >class P(p: Partition) > ^ > scala> class P(val index: Int) extends Partition > :11: error: not found: type Partition >class P(val index: Int) extends Partition >^ > {code} > Any class that I {{import}} gives "not found: type ___" when used as a > parameter to a class, or in an extends clause; this applies to classes I > import from JARs I provide via {{--jars}} as well as core Spark classes as > above. > This worked in 1.6.3 but has been broken since 2.0.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22165) Type conflicts between dates, timestamps and date in partition column
[ https://issues.apache.org/jira/browse/SPARK-22165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan updated SPARK-22165: Labels: release-notes (was: ) > Type conflicts between dates, timestamps and date in partition column > - > > Key: SPARK-22165 > URL: https://issues.apache.org/jira/browse/SPARK-22165 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1, 2.2.0, 2.3.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Minor > Labels: release-notes > Fix For: 2.3.0 > > > It looks we have some bugs when resolving type conflicts in partition column. > I found few corner cases as below: > Case 1: timestamp should be inferred but date type is inferred. > {code} > val df = Seq((1, "2015-01-01"), (2, "2016-01-01 00:00:00")).toDF("i", "ts") > df.write.format("parquet").partitionBy("ts").save("/tmp/foo") > spark.read.load("/tmp/foo").printSchema() > {code} > {code} > root > |-- i: integer (nullable = true) > |-- ts: date (nullable = true) > {code} > Case 2: decimal should be inferred but integer is inferred. > {code} > val df = Seq((1, "1"), (2, "1" * 30)).toDF("i", "decimal") > df.write.format("parquet").partitionBy("decimal").save("/tmp/bar") > spark.read.load("/tmp/bar").printSchema() > {code} > {code} > root > |-- i: integer (nullable = true) > |-- decimal: integer (nullable = true) > {code} > Looks we should de-duplicate type resolution logic if possible rather than > separate numeric precedence-like comparison alone. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22587) Spark job fails if fs.defaultFS and application jar are different url
[ https://issues.apache.org/jira/browse/SPARK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264118#comment-16264118 ] Sean Owen commented on SPARK-22587: --- Hm, but if the src and dest FS are different, it overwrites destPath to be a path relative to destDir. I am not sure if that is the actual problem. Is it that compareFs believes incorrectly that these represent the same FS? If so then I do wonder if it makes sense to always set {{destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))}} This is some old logic from Sandy; maybe [~vanzin] or [~steve_l] has an opinion on the logic here. https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L356 > Spark job fails if fs.defaultFS and application jar are different url > - > > Key: SPARK-22587 > URL: https://issues.apache.org/jira/browse/SPARK-22587 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.6.3 >Reporter: Prabhu Joseph > > Spark Job fails if the fs.defaultFs and url where application jar resides are > different and having same scheme, > spark-submit --conf spark.master=yarn-cluster wasb://XXX/tmp/test.py > core-site.xml fs.defaultFS is set to wasb:///YYY. Hadoop list works (hadoop > fs -ls) works for both the url XXX and YYY. > {code} > Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: > wasb://XXX/tmp/test.py, expected: wasb://YYY > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665) > at > org.apache.hadoop.fs.azure.NativeAzureFileSystem.checkPath(NativeAzureFileSystem.java:1251) > > at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:485) > at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:396) > at > org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:507) > > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:660) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:912) > > at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:172) > at org.apache.spark.deploy.yarn.Client.run(Client.scala:1248) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1307) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:751) > > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > The code Client.copyFileToRemote tries to resolve the path of application jar > (XXX) from the FileSystem object created using fs.defaultFS url (YYY) instead > of the actual url of application jar. > val destFs = destDir.getFileSystem(hadoopConf) > val srcFs = srcPath.getFileSystem(hadoopConf) > getFileSystem will create the filesystem based on the url of the path and so > this is fine. But the below lines of code tries to get the srcPath (XXX url) > from the destFs (YYY url) and so it fails. > var destPath = srcPath > val qualifiedDestPath = destFs.makeQualified(destPath) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22585) Url encoding of jar path expected?
[ https://issues.apache.org/jira/browse/SPARK-22585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264129#comment-16264129 ] Sean Owen commented on SPARK-22585: --- Hm, I think the issue is rather than the path needs to be encoded before becoming part of the URI. The URI {{file:/home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar}} is not one that names your file; it's {{file:/home/me/.coursier/cache/v1/https/artifactory.com%253A443/path/to.jar}} (escaped %). That may be a workaround here. {{/home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar}} is however a valid and correct local path to the file, and that's what the argument is meant to be. So the code should do the encoding. I'm aware that there are a number of places that probably turn paths into URIs, so would be best to try to update all issues of this form. I think it's low-risk as URI encoding won't do anything for most paths, and where it does, it's probably essential. > Url encoding of jar path expected? > -- > > Key: SPARK-22585 > URL: https://issues.apache.org/jira/browse/SPARK-22585 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Jakub Dubovsky > > I am calling {code}sparkContext.addJar{code} method with path to a local jar > I want to add. Example: > {code}/home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar{code}. > As a result I get an exception saying > {code} > Failed to add > /home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar to Spark > environment. Stacktrace: > java.io.FileNotFoundException: Jar > /home/me/.coursier/cache/v1/https/artifactory.com:443/path/to.jar not found > {code} > Important part to notice here is that colon character is url encoded in path > I want to use but exception is complaining about path in decoded form. This > is caused by this line of code from implementation ([see > here|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L1833]): > {code} > case null | "file" => addJarFile(new File(uri.getPath)) > {code} > It uses > [getPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getPath()] > method of > [java.net.URI|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html] > which url decodes the path. I believe method > [getRawPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getRawPath()] > should be used here which keeps path string in original form. > I tend to see this as a bug since I want to use my dependencies resolved from > artifactory with port directly. Is there some specific reason for this or can > we fix this? > Thanks -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22589) Subscribe to multiple roles in Mesos
Fabiano Francesconi created SPARK-22589: --- Summary: Subscribe to multiple roles in Mesos Key: SPARK-22589 URL: https://issues.apache.org/jira/browse/SPARK-22589 Project: Spark Issue Type: Wish Components: Spark Core Affects Versions: 2.2.0, 2.1.2 Reporter: Fabiano Francesconi Mesos offers the capability of [subscribing to multiple roles|http://mesos.apache.org/documentation/latest/roles/]. I believe that Spark could easily be extended to opt-in for this specific capability. >From my understanding, this is the [Spark source >code|https://github.com/apache/spark/blob/fc45c2c88a838b8f46659ebad2a8f3a9923bc95f/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L94] > that regulates the subscription to the role. I wonder on whether just passing >a comma-separated list of frameworks (hence, splitting on that string) would >already be sufficient to leverage this capability. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22560) Must create spark session directly to connect to hive
[ https://issues.apache.org/jira/browse/SPARK-22560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-22560. --- Resolution: Not A Problem Your second code snippet is correct. There is no problem here. In the first case you manually created a context without the right config. > Must create spark session directly to connect to hive > - > > Key: SPARK-22560 > URL: https://issues.apache.org/jira/browse/SPARK-22560 > Project: Spark > Issue Type: Bug > Components: Java API, SQL >Affects Versions: 2.1.0, 2.2.0 >Reporter: Ran Mingxuan > Original Estimate: 168h > Remaining Estimate: 168h > > In a java project I have to use both JavaSparkContext and SparkSession. I > find the order to create them affect hive connection. > I have built a spark job like below: > {code:java} > // wrong code > public void main(String[] args) > { > SparkConf sparkConf = new SparkConf().setAppName("testApp"); > JavaSparkContext sc = new JavaSparkContext(sparkConf); > SparkSession spark = > SparkSession.builder().sparkContext(sc.sc()).enableHiveSupport().getOrCreate(); > spark.sql("show databases").show(); > } > {code} > and with this code spark job will not be able to find hive meta-store even if > it can discover correct warehouse. > I have to use code like below to make things work: > {code:java} > // correct code > public String main(String[] args) > { > SparkConf sparkConf = new SparkConf().setAppName("testApp"); > SparkSession spark = > SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate(); > SparkContext sparkContext = spark.sparkContext(); > JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkContext); > spark.sql("show databases").show(); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22589) Subscribe to multiple roles in Mesos
[ https://issues.apache.org/jira/browse/SPARK-22589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabiano Francesconi updated SPARK-22589: Description: Mesos offers the capability of [subscribing to multiple roles|http://mesos.apache.org/documentation/latest/roles/]. I believe that Spark could easily be extended to opt-in for this specific capability. >From my understanding, this is the [Spark source >code|https://github.com/apache/spark/blob/fc45c2c88a838b8f46659ebad2a8f3a9923bc95f/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L94] > that regulates the subscription to the role. I wonder on whether just passing >a comma-separated list of frameworks (hence, splitting on that string) would >already be sufficient to leverage this capability. Is there any side-effect that this change will cause? was: Mesos offers the capability of [subscribing to multiple roles|http://mesos.apache.org/documentation/latest/roles/]. I believe that Spark could easily be extended to opt-in for this specific capability. >From my understanding, this is the [Spark source >code|https://github.com/apache/spark/blob/fc45c2c88a838b8f46659ebad2a8f3a9923bc95f/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L94] > that regulates the subscription to the role. I wonder on whether just passing >a comma-separated list of frameworks (hence, splitting on that string) would >already be sufficient to leverage this capability. > Subscribe to multiple roles in Mesos > > > Key: SPARK-22589 > URL: https://issues.apache.org/jira/browse/SPARK-22589 > Project: Spark > Issue Type: Wish > Components: Spark Core >Affects Versions: 2.1.2, 2.2.0 >Reporter: Fabiano Francesconi > Original Estimate: 48h > Remaining Estimate: 48h > > Mesos offers the capability of [subscribing to multiple > roles|http://mesos.apache.org/documentation/latest/roles/]. I believe that > Spark could easily be extended to opt-in for this specific capability. > From my understanding, this is the [Spark source > code|https://github.com/apache/spark/blob/fc45c2c88a838b8f46659ebad2a8f3a9923bc95f/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L94] > that regulates the subscription to the role. I wonder on whether just > passing a comma-separated list of frameworks (hence, splitting on that > string) would already be sufficient to leverage this capability. > Is there any side-effect that this change will cause? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22574) Wrong request causing Spark Dispatcher going inactive
[ https://issues.apache.org/jira/browse/SPARK-22574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-22574: -- Target Version/s: (was: 2.0.0, 2.1.0, 2.2.0) Fix Version/s: (was: 2.2.0) (was: 2.1.0) (was: 2.0.0) > Wrong request causing Spark Dispatcher going inactive > - > > Key: SPARK-22574 > URL: https://issues.apache.org/jira/browse/SPARK-22574 > Project: Spark > Issue Type: Bug > Components: Mesos, Spark Submit >Affects Versions: 2.2.0 >Reporter: German Schiavon Matteo >Priority: Minor > > When submitting a wrong _CreateSubmissionRequest_ to Spark Dispatcher is > causing a bad state of Dispatcher and making it inactive as a mesos framework. > The class CreateSubmissionRequest initialise its arguments to null as follows: > {code:title=CreateSubmissionRequest.scala|borderStyle=solid} > var appResource: String = null > var mainClass: String = null > var appArgs: Array[String] = null > var sparkProperties: Map[String, String] = null > var environmentVariables: Map[String, String] = null > {code} > There are some checks of these variables but not in all of them, for example > in appArgs and environmentVariables. > If you don't set _appArgs_ it will cause the following error: > {code:title=error|borderStyle=solid} > 17/11/21 14:37:24 INFO MesosClusterScheduler: Reviving Offers. > Exception in thread "Thread-22" java.lang.NullPointerException > at > org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.getDriverCommandValue(MesosClusterScheduler.scala:444) > at > org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.buildDriverCommand(MesosClusterScheduler.scala:451) > at > org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.org$apache$spark$scheduler$cluster$mesos$MesosClusterScheduler$$createTaskInfo(MesosClusterScheduler.scala:538) > at > org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:570) > at > org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:555) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.scheduleTasks(MesosClusterScheduler.scala:555) > at > org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.resourceOffers(MesosClusterScheduler.scala:621) > {code} > Because it's trying to access to it without checking whether is null or not. > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22590) SparkContext's local properties missing from TaskContext properties
Ajith S created SPARK-22590: --- Summary: SparkContext's local properties missing from TaskContext properties Key: SPARK-22590 URL: https://issues.apache.org/jira/browse/SPARK-22590 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: Ajith S Local properties set via sparkContext are not available as TaskContext properties when executing parallel jobs and threadpools have idle threads Explanation: When executing parallel jobs via {{BroadcastExchangeExec}} or {{SubqueryExec}}, the {{relationFuture}} is evaluated via a seperate thread. The threads inherit the {{localProperties}} from sparkContext as they are the child threads. These threads are controlled via the executionContext (thread pools). Each Thread pool has a default {{keepAliveSeconds}} of 60 seconds for idle threads. Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via {{sparkContext.runJob/submitJob}} Attached is a test-case to simulate this behavior -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22590) SparkContext's local properties missing from TaskContext properties
[ https://issues.apache.org/jira/browse/SPARK-22590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ajith S updated SPARK-22590: Attachment: TestProps.scala > SparkContext's local properties missing from TaskContext properties > --- > > Key: SPARK-22590 > URL: https://issues.apache.org/jira/browse/SPARK-22590 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Ajith S > Attachments: TestProps.scala > > > Local properties set via sparkContext are not available as TaskContext > properties when executing parallel jobs and threadpools have idle threads > Explanation: > When executing parallel jobs via {{BroadcastExchangeExec}} or > {{SubqueryExec}}, the {{relationFuture}} is evaluated via a seperate thread. > The threads inherit the {{localProperties}} from sparkContext as they are the > child threads. > These threads are controlled via the executionContext (thread pools). Each > Thread pool has a default {{keepAliveSeconds}} of 60 seconds for idle > threads. > Scenarios where the thread pool has threads which are idle and reused for a > subsequent new query, the thread local properties will not be inherited from > spark context (thread properties are inherited only on thread creation) hence > end up having old or no properties set. This will cause taskset properties to > be missing when properties are transferred by child thread via > {{sparkContext.runJob/submitJob}} > Attached is a test-case to simulate this behavior -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264208#comment-16264208 ] mohamed imran commented on SPARK-22526: --- [~ste...@apache.org] Thanks steve. I will modify my code as you mentioned in the JIRA and test it. If it works fine, i can use the same until you give the permanent fix! > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17920) HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException in AvroSerde when using avro.schema.url
[ https://issues.apache.org/jira/browse/SPARK-17920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264223#comment-16264223 ] Apache Spark commented on SPARK-17920: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/19799 > HiveWriterContainer passes null configuration to serde.initialize, causing > NullPointerException in AvroSerde when using avro.schema.url > --- > > Key: SPARK-17920 > URL: https://issues.apache.org/jira/browse/SPARK-17920 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2, 2.0.0 > Environment: AWS EMR 5.0.0: Spark 2.0.0, Hive 2.1.0 >Reporter: James Norvell >Assignee: Vinod KC >Priority: Minor > Fix For: 2.2.1, 2.3.0 > > Attachments: avro.avsc, avro_data > > > When HiveWriterContainer intializes a serde it explicitly passes null for the > Configuration: > https://github.com/apache/spark/blob/v2.0.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala#L161 > When attempting to write to a table stored as Avro with avro.schema.url set, > this causes a NullPointerException when it tries to get the FileSystem for > the URL: > https://github.com/apache/hive/blob/release-2.1.0-rc3/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java#L153 > Reproduction: > {noformat} > spark-sql> create external table avro_in (a string) stored as avro location > '/avro-in/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); > spark-sql> create external table avro_out (a string) stored as avro location > '/avro-out/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); > spark-sql> select * from avro_in; > hello > Time taken: 1.986 seconds, Fetched 1 row(s) > spark-sql> insert overwrite table avro_out select * from avro_in; > 16/10/13 19:34:47 WARN AvroSerDe: Encountered exception determining schema. > Returning signal schema to indicate problem > java.lang.NullPointerException > at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:359) > at > org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.getSchemaFromFS(AvroSerdeUtils.java:131) > at > org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:112) > at > org.apache.hadoop.hive.serde2.avro.AvroSerDe.determineSchemaOrReturnErrorSchema(AvroSerDe.java:167) > at > org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:103) > at > org.apache.spark.sql.hive.SparkHiveWriterContainer.newSerializer(hiveWriterContainers.scala:161) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:236) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:313) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) > at org.apache.spark.sql.Dataset.(Dataset.scala:186) > at org.apache.spark.sql.Dataset.(Dataset.scala:167) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:331) > at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:247) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) >
[jira] [Commented] (SPARK-22585) Url encoding of jar path expected?
[ https://issues.apache.org/jira/browse/SPARK-22585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264234#comment-16264234 ] Jakub Dubovsky commented on SPARK-22585: I am not sure about adding encoding step into implementation of addJar method. It's not about encoding whole path as a string since you want to keep some characters literally (':', '/' possibly others). So the code would first need to parse the path to get only path segments and encode those. This most probably leads to using URI again at which point this starts to be circular problem. Moreover I am not sure what is the point of encoding path segments only to ask URI to decode it... I also think that it makes sense to decode the segment only inside of a logic accessing a value of that segment. If I work with url/path as a whole I want to keep it parsable and therefore keep special characters encoded. This is the thinking I would personally use to decide which version (getPath/getRawPath) should be used in particular scenarios across spark code base even though I must admit I have very little insight into these other URI usecases :) > Url encoding of jar path expected? > -- > > Key: SPARK-22585 > URL: https://issues.apache.org/jira/browse/SPARK-22585 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Jakub Dubovsky > > I am calling {code}sparkContext.addJar{code} method with path to a local jar > I want to add. Example: > {code}/home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar{code}. > As a result I get an exception saying > {code} > Failed to add > /home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar to Spark > environment. Stacktrace: > java.io.FileNotFoundException: Jar > /home/me/.coursier/cache/v1/https/artifactory.com:443/path/to.jar not found > {code} > Important part to notice here is that colon character is url encoded in path > I want to use but exception is complaining about path in decoded form. This > is caused by this line of code from implementation ([see > here|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L1833]): > {code} > case null | "file" => addJarFile(new File(uri.getPath)) > {code} > It uses > [getPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getPath()] > method of > [java.net.URI|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html] > which url decodes the path. I believe method > [getRawPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getRawPath()] > should be used here which keeps path string in original form. > I tend to see this as a bug since I want to use my dependencies resolved from > artifactory with port directly. Is there some specific reason for this or can > we fix this? > Thanks -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22591) GenerateOrdering shouldn't change ctx.INPUT_ROW
Liang-Chi Hsieh created SPARK-22591: --- Summary: GenerateOrdering shouldn't change ctx.INPUT_ROW Key: SPARK-22591 URL: https://issues.apache.org/jira/browse/SPARK-22591 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Liang-Chi Hsieh {{GenerateOrdering}} changes {{ctx.INPUT_ROW}} but doesn't restore the original value. Since {{ctx.INPUT_ROW}} is used when generating codes, it is risky to change it arbitrarily. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22591) GenerateOrdering shouldn't change ctx.INPUT_ROW
[ https://issues.apache.org/jira/browse/SPARK-22591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264254#comment-16264254 ] Apache Spark commented on SPARK-22591: -- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/19800 > GenerateOrdering shouldn't change ctx.INPUT_ROW > --- > > Key: SPARK-22591 > URL: https://issues.apache.org/jira/browse/SPARK-22591 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > {{GenerateOrdering}} changes {{ctx.INPUT_ROW}} but doesn't restore the > original value. Since {{ctx.INPUT_ROW}} is used when generating codes, it is > risky to change it arbitrarily. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22591) GenerateOrdering shouldn't change ctx.INPUT_ROW
[ https://issues.apache.org/jira/browse/SPARK-22591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22591: Assignee: Apache Spark > GenerateOrdering shouldn't change ctx.INPUT_ROW > --- > > Key: SPARK-22591 > URL: https://issues.apache.org/jira/browse/SPARK-22591 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh >Assignee: Apache Spark > > {{GenerateOrdering}} changes {{ctx.INPUT_ROW}} but doesn't restore the > original value. Since {{ctx.INPUT_ROW}} is used when generating codes, it is > risky to change it arbitrarily. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22591) GenerateOrdering shouldn't change ctx.INPUT_ROW
[ https://issues.apache.org/jira/browse/SPARK-22591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22591: Assignee: (was: Apache Spark) > GenerateOrdering shouldn't change ctx.INPUT_ROW > --- > > Key: SPARK-22591 > URL: https://issues.apache.org/jira/browse/SPARK-22591 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > {{GenerateOrdering}} changes {{ctx.INPUT_ROW}} but doesn't restore the > original value. Since {{ctx.INPUT_ROW}} is used when generating codes, it is > risky to change it arbitrarily. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22592) cleanup filter converting for hive
Wenchen Fan created SPARK-22592: --- Summary: cleanup filter converting for hive Key: SPARK-22592 URL: https://issues.apache.org/jira/browse/SPARK-22592 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Wenchen Fan Assignee: Wenchen Fan Priority: Minor -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22592) cleanup filter converting for hive
[ https://issues.apache.org/jira/browse/SPARK-22592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22592: Assignee: Wenchen Fan (was: Apache Spark) > cleanup filter converting for hive > -- > > Key: SPARK-22592 > URL: https://issues.apache.org/jira/browse/SPARK-22592 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22592) cleanup filter converting for hive
[ https://issues.apache.org/jira/browse/SPARK-22592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22592: Assignee: Apache Spark (was: Wenchen Fan) > cleanup filter converting for hive > -- > > Key: SPARK-22592 > URL: https://issues.apache.org/jira/browse/SPARK-22592 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenchen Fan >Assignee: Apache Spark >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22592) cleanup filter converting for hive
[ https://issues.apache.org/jira/browse/SPARK-22592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264266#comment-16264266 ] Apache Spark commented on SPARK-22592: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/19801 > cleanup filter converting for hive > -- > > Key: SPARK-22592 > URL: https://issues.apache.org/jira/browse/SPARK-22592 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22593) submitMissingTask in DagScheduler will call partitions function many times whch may be time consuming
tomzhu created SPARK-22593: -- Summary: submitMissingTask in DagScheduler will call partitions function many times whch may be time consuming Key: SPARK-22593 URL: https://issues.apache.org/jira/browse/SPARK-22593 Project: Spark Issue Type: Question Components: Spark Core Affects Versions: 2.2.0 Reporter: tomzhu Priority: Minor when dagScheduler call submitMissing task, will create tasks and calling stage.rdd.partitions, it will can many times which may be time-consuming, the code is: {quote} val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) //here is a little time consuming. val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } {quote} for example, for a parallelCollectionRdd with 3 slices or partitions, to create task, the code will call stage.rdd.partitions three times, since stage.rdd.partitions will call getPartitions, so getPartions will call three times, it is a little time-cousuming. the stage.rdd.partitions code : {quote} final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { partitions_ = getPartitions partitions_.zipWithIndex.foreach { case (partition, index) => require(partition.index == index, s"partitions($index).partition == ${partition.index}, but it should equal $index") } } partitions_ } } {quote} it would be better to avoid this. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7721) Generate test coverage report from Python
[ https://issues.apache.org/jira/browse/SPARK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264349#comment-16264349 ] Hyukjin Kwon commented on SPARK-7721: - I knew the similar way but was't sure if this was the only way so I was hesitant but found this JIRA. I can give a shot if using git pages sounds good for you guys. > Generate test coverage report from Python > - > > Key: SPARK-7721 > URL: https://issues.apache.org/jira/browse/SPARK-7721 > Project: Spark > Issue Type: Test > Components: PySpark, Tests >Reporter: Reynold Xin > > Would be great to have test coverage report for Python. Compared with Scala, > it is tricker to understand the coverage without coverage reports in Python > because we employ both docstring tests and unit tests in test files. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22585) Url encoding of jar path expected?
[ https://issues.apache.org/jira/browse/SPARK-22585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264392#comment-16264392 ] Sean Owen commented on SPARK-22585: --- Yes, I mean escaping the path only; the host and scheme and so on may use reserved characters for their intended usage. I think that's the context here where it's just paths being turned into local file URIs. A URI is the right representation in this code but needs to represent the right path. I don't see any issue with that, nor decoding. It's not round-tripping for nothing. I don't think the representation depends on usage. The URI's representation simply needs to correctly represent the resource. That's not quite happening here, and it's because special chars aren't escaped in the right places. > Url encoding of jar path expected? > -- > > Key: SPARK-22585 > URL: https://issues.apache.org/jira/browse/SPARK-22585 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Jakub Dubovsky > > I am calling {code}sparkContext.addJar{code} method with path to a local jar > I want to add. Example: > {code}/home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar{code}. > As a result I get an exception saying > {code} > Failed to add > /home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar to Spark > environment. Stacktrace: > java.io.FileNotFoundException: Jar > /home/me/.coursier/cache/v1/https/artifactory.com:443/path/to.jar not found > {code} > Important part to notice here is that colon character is url encoded in path > I want to use but exception is complaining about path in decoded form. This > is caused by this line of code from implementation ([see > here|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L1833]): > {code} > case null | "file" => addJarFile(new File(uri.getPath)) > {code} > It uses > [getPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getPath()] > method of > [java.net.URI|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html] > which url decodes the path. I believe method > [getRawPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getRawPath()] > should be used here which keeps path string in original form. > I tend to see this as a bug since I want to use my dependencies resolved from > artifactory with port directly. Is there some specific reason for this or can > we fix this? > Thanks -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22584) dataframe write partitionBy out of disk/java heap issues
[ https://issues.apache.org/jira/browse/SPARK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264447#comment-16264447 ] Derek M Miller commented on SPARK-22584: I disagree. I should not be running out of memory for a file that only has 6mb with 5 instances that have 16gb of memory. Even when the data is evenly distributed across partitions, I am still seeing this issue. I posted this on stackoverflow, and it seems like others are experiencing this issue as well https://stackoverflow.com/questions/47382977/spark-2-2-write-partitionby-out-of-memory-exception. > dataframe write partitionBy out of disk/java heap issues > > > Key: SPARK-22584 > URL: https://issues.apache.org/jira/browse/SPARK-22584 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Derek M Miller > > I have been seeing some issues with partitionBy for the dataframe writer. I > currently have a file that is 6mb, just for testing, and it has around 1487 > rows and 21 columns. There is nothing out of the ordinary with the columns, > having either a DoubleType or StringType. The partitionBy calls two different > partitions with verified low cardinality. One partition has 30 unique values > and the other one has 2 unique values. > ```scala > df > .write.partitionBy("first", "second") > .mode(SaveMode.Overwrite) > .parquet(s"$location$example/$corrId/") > ``` > When running this example on Amazon's EMR with 5 r4.xlarges (30 gb of memory > each), I am getting a java heap out of memory error. I have > maximizeResourceAllocation set, and verified on the instances. I have even > set it to false, explicitly set the driver and executor memory to 16g, but > still had the same issue. Occasionally I get an error about disk space, and > the job seems to work if I use an r3.xlarge (that has the ssd). But that > seems weird that 6mb of data needs to spill to disk. > The problem mainly seems to be centered around two + partitions vs 1. If I > just use either of the partitions only, I have no problems. It's also worth > noting that each of the partitions are evenly distributed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22584) dataframe write partitionBy out of disk/java heap issues
[ https://issues.apache.org/jira/browse/SPARK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264448#comment-16264448 ] Sean Owen commented on SPARK-22584: --- It depends on too many things: what did you transform the data into? did you cache it? how much memory is actually allocated to Spark? driver vs executor? what ran out of memory, where? This is too open ended for a JIRA. > dataframe write partitionBy out of disk/java heap issues > > > Key: SPARK-22584 > URL: https://issues.apache.org/jira/browse/SPARK-22584 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Derek M Miller > > I have been seeing some issues with partitionBy for the dataframe writer. I > currently have a file that is 6mb, just for testing, and it has around 1487 > rows and 21 columns. There is nothing out of the ordinary with the columns, > having either a DoubleType or StringType. The partitionBy calls two different > partitions with verified low cardinality. One partition has 30 unique values > and the other one has 2 unique values. > ```scala > df > .write.partitionBy("first", "second") > .mode(SaveMode.Overwrite) > .parquet(s"$location$example/$corrId/") > ``` > When running this example on Amazon's EMR with 5 r4.xlarges (30 gb of memory > each), I am getting a java heap out of memory error. I have > maximizeResourceAllocation set, and verified on the instances. I have even > set it to false, explicitly set the driver and executor memory to 16g, but > still had the same issue. Occasionally I get an error about disk space, and > the job seems to work if I use an r3.xlarge (that has the ssd). But that > seems weird that 6mb of data needs to spill to disk. > The problem mainly seems to be centered around two + partitions vs 1. If I > just use either of the partitions only, I have no problems. It's also worth > noting that each of the partitions are evenly distributed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22585) Url encoding of jar path expected?
[ https://issues.apache.org/jira/browse/SPARK-22585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264456#comment-16264456 ] Jakub Dubovsky commented on SPARK-22585: I just tried how URI behaves on some examples and learned that it is doing something else then I think. So yes either encoding a path or using getRawPath are both valid solutions to me. Should I create PR for this or what is next step? > Url encoding of jar path expected? > -- > > Key: SPARK-22585 > URL: https://issues.apache.org/jira/browse/SPARK-22585 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Jakub Dubovsky > > I am calling {code}sparkContext.addJar{code} method with path to a local jar > I want to add. Example: > {code}/home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar{code}. > As a result I get an exception saying > {code} > Failed to add > /home/me/.coursier/cache/v1/https/artifactory.com%3A443/path/to.jar to Spark > environment. Stacktrace: > java.io.FileNotFoundException: Jar > /home/me/.coursier/cache/v1/https/artifactory.com:443/path/to.jar not found > {code} > Important part to notice here is that colon character is url encoded in path > I want to use but exception is complaining about path in decoded form. This > is caused by this line of code from implementation ([see > here|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L1833]): > {code} > case null | "file" => addJarFile(new File(uri.getPath)) > {code} > It uses > [getPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getPath()] > method of > [java.net.URI|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html] > which url decodes the path. I believe method > [getRawPath|https://docs.oracle.com/javase/7/docs/api/java/net/URI.html#getRawPath()] > should be used here which keeps path string in original form. > I tend to see this as a bug since I want to use my dependencies resolved from > artifactory with port directly. Is there some specific reason for this or can > we fix this? > Thanks -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22584) dataframe write partitionBy out of disk/java heap issues
[ https://issues.apache.org/jira/browse/SPARK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264475#comment-16264475 ] Derek M Miller commented on SPARK-22584: So, in this case, the executor and driver were both given 16g of memory (i.e --driver-memory 16g --executor-memory 16g). The dataframe was loaded from parquet. If I save the dataframe as is with no partitions, I don't have any issues. If I save it with one partition, same thing. However, adding the second partition causes the job to need to write to disk. The error every once in awhile is in the driver. However, I mostly see it in an executor (it isn't consistent). It ran out of memory in the middle of the partitionBy. It seemed to write a couple of partitions, then fail in the middle of the action. > dataframe write partitionBy out of disk/java heap issues > > > Key: SPARK-22584 > URL: https://issues.apache.org/jira/browse/SPARK-22584 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Derek M Miller > > I have been seeing some issues with partitionBy for the dataframe writer. I > currently have a file that is 6mb, just for testing, and it has around 1487 > rows and 21 columns. There is nothing out of the ordinary with the columns, > having either a DoubleType or StringType. The partitionBy calls two different > partitions with verified low cardinality. One partition has 30 unique values > and the other one has 2 unique values. > ```scala > df > .write.partitionBy("first", "second") > .mode(SaveMode.Overwrite) > .parquet(s"$location$example/$corrId/") > ``` > When running this example on Amazon's EMR with 5 r4.xlarges (30 gb of memory > each), I am getting a java heap out of memory error. I have > maximizeResourceAllocation set, and verified on the instances. I have even > set it to false, explicitly set the driver and executor memory to 16g, but > still had the same issue. Occasionally I get an error about disk space, and > the job seems to work if I use an r3.xlarge (that has the ssd). But that > seems weird that 6mb of data needs to spill to disk. > The problem mainly seems to be centered around two + partitions vs 1. If I > just use either of the partitions only, I have no problems. It's also worth > noting that each of the partitions are evenly distributed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264095#comment-16264095 ] Steve Loughran edited comment on SPARK-22526 at 11/23/17 3:47 PM: -- # Fix the code you invoke # wrap the code you invoke with something like (and this is coded in the JIRA, untested & should really close the stream in something to swallow IOEs. {code} binaryRdd.map { t => try { process(t._2) } finally { t._2.close() } } {code} was (Author: ste...@apache.org): # Fix the code you invoke #. wrap the code you invoke with something like (and this is coded in the JIRA, untested & should really close the stream in something to swallow IOEs. {code} binaryRdd.map { t => try { process(t._2) } finally { t._2.close() } } {code} > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264496#comment-16264496 ] Steve Loughran commented on SPARK-22526: I'm not giving a permanent fix. It's a bug in your code or the code you are invoking "forgets to close input stream" Unless [~srowen] has other ideas, I'd recommend as closing as one of {WORKSFORME, WONTFIX or INVALID} > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22582) Spark SQL round throws error with negative precision
[ https://issues.apache.org/jira/browse/SPARK-22582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264582#comment-16264582 ] Marco Gaido commented on SPARK-22582: - I tried to run {code} spark.sql("select round(100.1 , 1) as c3, round(100.1 , -1) as c5").show {code} on branch master and it works. May you try to reproduce the error in a newer Spark version? > Spark SQL round throws error with negative precision > > > Key: SPARK-22582 > URL: https://issues.apache.org/jira/browse/SPARK-22582 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Yuxin Cao > > select round(100.1 , 1) as c3, > round(100.1 , -1) as c5 from orders; > Error: java.lang.IllegalArgumentException: Error: name expected at the > position 10 of 'decimal(4,-1)' but '-' is found. (state=,code=0) > The same query works fine in Spark 1.6. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22594) Handling spark-submit and master version mismatch
Jiri Kremser created SPARK-22594: Summary: Handling spark-submit and master version mismatch Key: SPARK-22594 URL: https://issues.apache.org/jira/browse/SPARK-22594 Project: Spark Issue Type: Bug Components: Spark Core, Spark Shell, Spark Submit Affects Versions: 2.2.0, 2.1.0 Reporter: Jiri Kremser Priority: Minor When using spark-submit in different version than the remote Spark master, the execution fails on during the message deserialization with this log entry / exception: {code} Error while invoking RpcHandler#receive() for one-way message. java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = 1835832137613908542, local class serialVersionUID = -1329125091869941550 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108) at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:271) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:320) at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:270) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:269) at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:604) at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:655) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:647) at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:209) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) ... {code} This is quite ok and it can be read as version mismatch between the client and server, however there is no such a message on the client (spark-submit) side, so if the submitter doesn't have an access to the spark master or spark UI, there is no way to figure out what is wrong. I propose sending a {{RpcFailure}} message back from server to client with some more informative error. I'd use the {{OneWayMessage}} instead of {{RpcFailure}}, because there was no counterpart {{RpcRequest}}, but I had no luck sending it using the {{reverseClient.send()}}. I think some internal protocol is assumed when sending messages server2client. I have a patch prepared. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22595) flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes beyond 64KB
[ https://issues.apache.org/jira/browse/SPARK-22595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264589#comment-16264589 ] Wenchen Fan commented on SPARK-22595: - cc [~kiszk] can you take a look? We may need to reduce the number of fields. > flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes > beyond 64KB > > > Key: SPARK-22595 > URL: https://issues.apache.org/jira/browse/SPARK-22595 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Wenchen Fan >Assignee: Kazuaki Ishizaki > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22595) flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes beyond 64KB
Wenchen Fan created SPARK-22595: --- Summary: flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes beyond 64KB Key: SPARK-22595 URL: https://issues.apache.org/jira/browse/SPARK-22595 Project: Spark Issue Type: Test Components: SQL Affects Versions: 2.2.1, 2.3.0 Reporter: Wenchen Fan Assignee: Kazuaki Ishizaki -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22594) Handling spark-submit and master version mismatch
[ https://issues.apache.org/jira/browse/SPARK-22594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264596#comment-16264596 ] Jiri Kremser commented on SPARK-22594: -- PR here: https://github.com/apache/spark/pull/19802 it's still WIP if this is a wanted change, I am prepared to add also tests. > Handling spark-submit and master version mismatch > - > > Key: SPARK-22594 > URL: https://issues.apache.org/jira/browse/SPARK-22594 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Shell, Spark Submit >Affects Versions: 2.1.0, 2.2.0 >Reporter: Jiri Kremser >Priority: Minor > > When using spark-submit in different version than the remote Spark master, > the execution fails on during the message deserialization with this log entry > / exception: > {code} > Error while invoking RpcHandler#receive() for one-way message. > java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local > class incompatible: stream classdesc serialVersionUID = 1835832137613908542, > local class serialVersionUID = -1329125091869941550 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:271) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:320) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:270) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:269) > at > org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:604) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:655) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:647) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:209) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:114) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) > ... > {code} > This is quite ok and it can be read as version mismatch between the client > and server, however there is no such a message on the client (spark-submit) > side, so if the submitter doesn't have an access to the spark master or spark > UI, there is no way to figure out what is wrong. > I propose sending a {{RpcFailure}} message back from server to client with > some more informative error. I'd use the {{OneWayMessage}} instead of > {{RpcFailure}}, because there was no counterpart {{RpcRequest}}, but I had no > luck sending it using the {{reverseClient.send()}}. I think some internal > protocol is assumed when sending messages server2client. > I have a patch prepared. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22594) Handling spark-submit and master version mismatch
[ https://issues.apache.org/jira/browse/SPARK-22594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22594: Assignee: Apache Spark > Handling spark-submit and master version mismatch > - > > Key: SPARK-22594 > URL: https://issues.apache.org/jira/browse/SPARK-22594 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Shell, Spark Submit >Affects Versions: 2.1.0, 2.2.0 >Reporter: Jiri Kremser >Assignee: Apache Spark >Priority: Minor > > When using spark-submit in different version than the remote Spark master, > the execution fails on during the message deserialization with this log entry > / exception: > {code} > Error while invoking RpcHandler#receive() for one-way message. > java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local > class incompatible: stream classdesc serialVersionUID = 1835832137613908542, > local class serialVersionUID = -1329125091869941550 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:271) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:320) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:270) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:269) > at > org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:604) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:655) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:647) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:209) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:114) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) > ... > {code} > This is quite ok and it can be read as version mismatch between the client > and server, however there is no such a message on the client (spark-submit) > side, so if the submitter doesn't have an access to the spark master or spark > UI, there is no way to figure out what is wrong. > I propose sending a {{RpcFailure}} message back from server to client with > some more informative error. I'd use the {{OneWayMessage}} instead of > {{RpcFailure}}, because there was no counterpart {{RpcRequest}}, but I had no > luck sending it using the {{reverseClient.send()}}. I think some internal > protocol is assumed when sending messages server2client. > I have a patch prepared. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22594) Handling spark-submit and master version mismatch
[ https://issues.apache.org/jira/browse/SPARK-22594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22594: Assignee: (was: Apache Spark) > Handling spark-submit and master version mismatch > - > > Key: SPARK-22594 > URL: https://issues.apache.org/jira/browse/SPARK-22594 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Shell, Spark Submit >Affects Versions: 2.1.0, 2.2.0 >Reporter: Jiri Kremser >Priority: Minor > > When using spark-submit in different version than the remote Spark master, > the execution fails on during the message deserialization with this log entry > / exception: > {code} > Error while invoking RpcHandler#receive() for one-way message. > java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local > class incompatible: stream classdesc serialVersionUID = 1835832137613908542, > local class serialVersionUID = -1329125091869941550 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:271) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:320) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:270) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:269) > at > org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:604) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:655) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:647) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:209) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:114) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) > ... > {code} > This is quite ok and it can be read as version mismatch between the client > and server, however there is no such a message on the client (spark-submit) > side, so if the submitter doesn't have an access to the spark master or spark > UI, there is no way to figure out what is wrong. > I propose sending a {{RpcFailure}} message back from server to client with > some more informative error. I'd use the {{OneWayMessage}} instead of > {{RpcFailure}}, because there was no counterpart {{RpcRequest}}, but I had no > luck sending it using the {{reverseClient.send()}}. I think some internal > protocol is assumed when sending messages server2client. > I have a patch prepared. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22594) Handling spark-submit and master version mismatch
[ https://issues.apache.org/jira/browse/SPARK-22594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264597#comment-16264597 ] Apache Spark commented on SPARK-22594: -- User 'Jiri-Kremser' has created a pull request for this issue: https://github.com/apache/spark/pull/19802 > Handling spark-submit and master version mismatch > - > > Key: SPARK-22594 > URL: https://issues.apache.org/jira/browse/SPARK-22594 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Shell, Spark Submit >Affects Versions: 2.1.0, 2.2.0 >Reporter: Jiri Kremser >Priority: Minor > > When using spark-submit in different version than the remote Spark master, > the execution fails on during the message deserialization with this log entry > / exception: > {code} > Error while invoking RpcHandler#receive() for one-way message. > java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local > class incompatible: stream classdesc serialVersionUID = 1835832137613908542, > local class serialVersionUID = -1329125091869941550 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:271) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:320) > at > org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:270) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:269) > at > org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:604) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:655) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:647) > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:209) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:114) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) > ... > {code} > This is quite ok and it can be read as version mismatch between the client > and server, however there is no such a message on the client (spark-submit) > side, so if the submitter doesn't have an access to the spark master or spark > UI, there is no way to figure out what is wrong. > I propose sending a {{RpcFailure}} message back from server to client with > some more informative error. I'd use the {{OneWayMessage}} instead of > {{RpcFailure}}, because there was no counterpart {{RpcRequest}}, but I had no > luck sending it using the {{reverseClient.send()}}. I think some internal > protocol is assumed when sending messages server2client. > I have a patch prepared. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22596) set ctx.currentVars in CodegenSupport.consume
Wenchen Fan created SPARK-22596: --- Summary: set ctx.currentVars in CodegenSupport.consume Key: SPARK-22596 URL: https://issues.apache.org/jira/browse/SPARK-22596 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Wenchen Fan Assignee: Wenchen Fan -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22596) set ctx.currentVars in CodegenSupport.consume
[ https://issues.apache.org/jira/browse/SPARK-22596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22596: Assignee: Wenchen Fan (was: Apache Spark) > set ctx.currentVars in CodegenSupport.consume > - > > Key: SPARK-22596 > URL: https://issues.apache.org/jira/browse/SPARK-22596 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22596) set ctx.currentVars in CodegenSupport.consume
[ https://issues.apache.org/jira/browse/SPARK-22596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264615#comment-16264615 ] Apache Spark commented on SPARK-22596: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/19803 > set ctx.currentVars in CodegenSupport.consume > - > > Key: SPARK-22596 > URL: https://issues.apache.org/jira/browse/SPARK-22596 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22596) set ctx.currentVars in CodegenSupport.consume
[ https://issues.apache.org/jira/browse/SPARK-22596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22596: Assignee: Apache Spark (was: Wenchen Fan) > set ctx.currentVars in CodegenSupport.consume > - > > Key: SPARK-22596 > URL: https://issues.apache.org/jira/browse/SPARK-22596 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenchen Fan >Assignee: Apache Spark > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22573) SQL Planner is including unnecessary columns in the projection
[ https://issues.apache.org/jira/browse/SPARK-22573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22573: Assignee: Apache Spark > SQL Planner is including unnecessary columns in the projection > -- > > Key: SPARK-22573 > URL: https://issues.apache.org/jira/browse/SPARK-22573 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Rajkishore Hembram >Assignee: Apache Spark > > While I was running TPC-H query 18 for benchmarking, I observed that the > query plan for Apache Spark 2.2.0 is inefficient than other versions of > Apache Spark. I noticed that the other versions of Apache Spark (2.0.2 and > 2.1.2) are only including the required columns in the projections. But the > query planner of Apache Spark 2.2.0 is including unnecessary columns into the > projection for some of the queries and hence unnecessarily increasing the > I/O. And because of that the Apache Spark 2.2.0 is taking more time. > [Spark 2.1.2 TPC-H Query 18 > Plan|https://drive.google.com/file/d/1_u8nPKG_SIM7P6fs0VK-8UEXIhWPY_BN/view] > [Spark 2.2.0 TPC-H Query 18 > Plan|https://drive.google.com/file/d/1xtxG5Ext36djfTDSdf_W5vGbbdgRApPo/view] > TPC-H Query 18 > {code:java} > select C_NAME,C_CUSTKEY,O_ORDERKEY,O_ORDERDATE,O_TOTALPRICE,sum(L_QUANTITY) > from CUSTOMER,ORDERS,LINEITEM where O_ORDERKEY in ( select L_ORDERKEY from > LINEITEM group by L_ORDERKEY having sum(L_QUANTITY) > 300 ) and C_CUSTKEY = > O_CUSTKEY and O_ORDERKEY = L_ORDERKEY group by > C_NAME,C_CUSTKEY,O_ORDERKEY,O_ORDERDATE,O_TOTALPRICE order by O_TOTALPRICE > desc,O_ORDERDATE > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22573) SQL Planner is including unnecessary columns in the projection
[ https://issues.apache.org/jira/browse/SPARK-22573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264617#comment-16264617 ] Apache Spark commented on SPARK-22573: -- User 'wangyum' has created a pull request for this issue: https://github.com/apache/spark/pull/19804 > SQL Planner is including unnecessary columns in the projection > -- > > Key: SPARK-22573 > URL: https://issues.apache.org/jira/browse/SPARK-22573 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Rajkishore Hembram > > While I was running TPC-H query 18 for benchmarking, I observed that the > query plan for Apache Spark 2.2.0 is inefficient than other versions of > Apache Spark. I noticed that the other versions of Apache Spark (2.0.2 and > 2.1.2) are only including the required columns in the projections. But the > query planner of Apache Spark 2.2.0 is including unnecessary columns into the > projection for some of the queries and hence unnecessarily increasing the > I/O. And because of that the Apache Spark 2.2.0 is taking more time. > [Spark 2.1.2 TPC-H Query 18 > Plan|https://drive.google.com/file/d/1_u8nPKG_SIM7P6fs0VK-8UEXIhWPY_BN/view] > [Spark 2.2.0 TPC-H Query 18 > Plan|https://drive.google.com/file/d/1xtxG5Ext36djfTDSdf_W5vGbbdgRApPo/view] > TPC-H Query 18 > {code:java} > select C_NAME,C_CUSTKEY,O_ORDERKEY,O_ORDERDATE,O_TOTALPRICE,sum(L_QUANTITY) > from CUSTOMER,ORDERS,LINEITEM where O_ORDERKEY in ( select L_ORDERKEY from > LINEITEM group by L_ORDERKEY having sum(L_QUANTITY) > 300 ) and C_CUSTKEY = > O_CUSTKEY and O_ORDERKEY = L_ORDERKEY group by > C_NAME,C_CUSTKEY,O_ORDERKEY,O_ORDERDATE,O_TOTALPRICE order by O_TOTALPRICE > desc,O_ORDERDATE > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22573) SQL Planner is including unnecessary columns in the projection
[ https://issues.apache.org/jira/browse/SPARK-22573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22573: Assignee: (was: Apache Spark) > SQL Planner is including unnecessary columns in the projection > -- > > Key: SPARK-22573 > URL: https://issues.apache.org/jira/browse/SPARK-22573 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Rajkishore Hembram > > While I was running TPC-H query 18 for benchmarking, I observed that the > query plan for Apache Spark 2.2.0 is inefficient than other versions of > Apache Spark. I noticed that the other versions of Apache Spark (2.0.2 and > 2.1.2) are only including the required columns in the projections. But the > query planner of Apache Spark 2.2.0 is including unnecessary columns into the > projection for some of the queries and hence unnecessarily increasing the > I/O. And because of that the Apache Spark 2.2.0 is taking more time. > [Spark 2.1.2 TPC-H Query 18 > Plan|https://drive.google.com/file/d/1_u8nPKG_SIM7P6fs0VK-8UEXIhWPY_BN/view] > [Spark 2.2.0 TPC-H Query 18 > Plan|https://drive.google.com/file/d/1xtxG5Ext36djfTDSdf_W5vGbbdgRApPo/view] > TPC-H Query 18 > {code:java} > select C_NAME,C_CUSTKEY,O_ORDERKEY,O_ORDERDATE,O_TOTALPRICE,sum(L_QUANTITY) > from CUSTOMER,ORDERS,LINEITEM where O_ORDERKEY in ( select L_ORDERKEY from > LINEITEM group by L_ORDERKEY having sum(L_QUANTITY) > 300 ) and C_CUSTKEY = > O_CUSTKEY and O_ORDERKEY = L_ORDERKEY group by > C_NAME,C_CUSTKEY,O_ORDERKEY,O_ORDERDATE,O_TOTALPRICE order by O_TOTALPRICE > desc,O_ORDERDATE > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22561) Dynamically update topics list for spark kafka consumer
[ https://issues.apache.org/jira/browse/SPARK-22561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264643#comment-16264643 ] Cody Koeninger commented on SPARK-22561: See SubscribePattern http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#consumerstrategies > Dynamically update topics list for spark kafka consumer > --- > > Key: SPARK-22561 > URL: https://issues.apache.org/jira/browse/SPARK-22561 > Project: Spark > Issue Type: New Feature > Components: DStreams >Affects Versions: 2.1.0, 2.1.1, 2.2.0 >Reporter: Arun > > The Spark Streaming application should allow to add new topic after streaming > context is intialized and DStream is started. This is very useful feature > specially when business is working multi geography or multi business units. > For example initially I have spark-kakfa consumer listening for topics: > ["topic-1"."topic-2"] and after couple of days I have added new topics to > kafka ["topic-3","topic-4"], now is there a way to update spark-kafka > consumer topics list and ask spark-kafka consumer to consume data for updated > list of topics without stopping sparkStreaming application or sparkStreaming > context. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22561) Dynamically update topics list for spark kafka consumer
[ https://issues.apache.org/jira/browse/SPARK-22561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger resolved SPARK-22561. Resolution: Not A Problem > Dynamically update topics list for spark kafka consumer > --- > > Key: SPARK-22561 > URL: https://issues.apache.org/jira/browse/SPARK-22561 > Project: Spark > Issue Type: New Feature > Components: DStreams >Affects Versions: 2.1.0, 2.1.1, 2.2.0 >Reporter: Arun > > The Spark Streaming application should allow to add new topic after streaming > context is intialized and DStream is started. This is very useful feature > specially when business is working multi geography or multi business units. > For example initially I have spark-kakfa consumer listening for topics: > ["topic-1"."topic-2"] and after couple of days I have added new topics to > kafka ["topic-3","topic-4"], now is there a way to update spark-kafka > consumer topics list and ask spark-kafka consumer to consume data for updated > list of topics without stopping sparkStreaming application or sparkStreaming > context. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22595) flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes beyond 64KB
[ https://issues.apache.org/jira/browse/SPARK-22595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264663#comment-16264663 ] Kazuaki Ishizaki commented on SPARK-22595: -- I see. I will look at this. > flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes > beyond 64KB > > > Key: SPARK-22595 > URL: https://issues.apache.org/jira/browse/SPARK-22595 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Wenchen Fan >Assignee: Kazuaki Ishizaki > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22595) flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes beyond 64KB
[ https://issues.apache.org/jira/browse/SPARK-22595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264725#comment-16264725 ] Apache Spark commented on SPARK-22595: -- User 'kiszk' has created a pull request for this issue: https://github.com/apache/spark/pull/19806 > flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes > beyond 64KB > > > Key: SPARK-22595 > URL: https://issues.apache.org/jira/browse/SPARK-22595 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Wenchen Fan >Assignee: Kazuaki Ishizaki > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22595) flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes beyond 64KB
[ https://issues.apache.org/jira/browse/SPARK-22595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22595: Assignee: Kazuaki Ishizaki (was: Apache Spark) > flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes > beyond 64KB > > > Key: SPARK-22595 > URL: https://issues.apache.org/jira/browse/SPARK-22595 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Wenchen Fan >Assignee: Kazuaki Ishizaki > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22595) flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes beyond 64KB
[ https://issues.apache.org/jira/browse/SPARK-22595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22595: Assignee: Apache Spark (was: Kazuaki Ishizaki) > flaky test: CastSuite.SPARK-22500: cast for struct should not generate codes > beyond 64KB > > > Key: SPARK-22595 > URL: https://issues.apache.org/jira/browse/SPARK-22595 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Wenchen Fan >Assignee: Apache Spark > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22500) 64KB JVM bytecode limit problem with cast
[ https://issues.apache.org/jira/browse/SPARK-22500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264726#comment-16264726 ] Apache Spark commented on SPARK-22500: -- User 'kiszk' has created a pull request for this issue: https://github.com/apache/spark/pull/19806 > 64KB JVM bytecode limit problem with cast > - > > Key: SPARK-22500 > URL: https://issues.apache.org/jira/browse/SPARK-22500 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kazuaki Ishizaki >Assignee: Kazuaki Ishizaki > Fix For: 2.2.1, 2.3.0 > > > {{Cast}} can throw an exception due to the 64KB JVM bytecode limit when they > use with a lot of structure fields -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22495) Fix setup of SPARK_HOME variable on Windows
[ https://issues.apache.org/jira/browse/SPARK-22495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264727#comment-16264727 ] Apache Spark commented on SPARK-22495: -- User 'jsnowacki' has created a pull request for this issue: https://github.com/apache/spark/pull/19807 > Fix setup of SPARK_HOME variable on Windows > --- > > Key: SPARK-22495 > URL: https://issues.apache.org/jira/browse/SPARK-22495 > Project: Spark > Issue Type: Bug > Components: PySpark, Windows >Affects Versions: 2.3.0 >Reporter: Hyukjin Kwon >Assignee: Jakub Nowacki >Priority: Minor > Fix For: 2.3.0 > > > On Windows, pip installed pyspark is unable to find out the spark home. There > is already proposed change, sufficient details and discussions in > https://github.com/apache/spark/pull/19370 and SPARK-18136 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22592) cleanup filter converting for hive
[ https://issues.apache.org/jira/browse/SPARK-22592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-22592. - Resolution: Fixed Fix Version/s: 2.3.0 > cleanup filter converting for hive > -- > > Key: SPARK-22592 > URL: https://issues.apache.org/jira/browse/SPARK-22592 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Minor > Fix For: 2.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18084) write.partitionBy() does not recognize nested columns that select() can access
[ https://issues.apache.org/jira/browse/SPARK-18084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264864#comment-16264864 ] Yassine Labidi commented on SPARK-18084: What's the status on this issue ? > write.partitionBy() does not recognize nested columns that select() can access > -- > > Key: SPARK-18084 > URL: https://issues.apache.org/jira/browse/SPARK-18084 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Nicholas Chammas >Priority: Minor > > Here's a simple repro in the PySpark shell: > {code} > from pyspark.sql import Row > rdd = spark.sparkContext.parallelize([Row(a=Row(b=5))]) > df = spark.createDataFrame(rdd) > df.printSchema() > df.select('a.b').show() # works > df.write.partitionBy('a.b').text('/tmp/test') # doesn't work > {code} > Here's what I see when I run this: > {code} > >>> from pyspark.sql import Row > >>> rdd = spark.sparkContext.parallelize([Row(a=Row(b=5))]) > >>> df = spark.createDataFrame(rdd) > >>> df.printSchema() > root > |-- a: struct (nullable = true) > ||-- b: long (nullable = true) > >>> df.show() > +---+ > | a| > +---+ > |[5]| > +---+ > >>> df.select('a.b').show() > +---+ > | b| > +---+ > | 5| > +---+ > >>> df.write.partitionBy('a.b').text('/tmp/test') > Traceback (most recent call last): > File > "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/utils.py", > line 63, in deco > return f(*a, **kw) > File > "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", > line 319, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o233.text. > : org.apache.spark.sql.AnalysisException: Partition column a.b not found in > schema > StructType(StructField(a,StructType(StructField(b,LongType,true)),true)); > at > org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$10.apply(PartitioningUtils.scala:368) > at > org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$10.apply(PartitioningUtils.scala:368) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:367) > at > org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:366) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.PartitioningUtils$.partitionColumnsSchema(PartitioningUtils.scala:366) > at > org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumn(PartitioningUtils.scala:349) > at > org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:458) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) > at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:534) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:214) > at java.lang.Thread.run(Thread.java:745) > During handling of the above exception, another exception occurred: > Traceback (most recent call last): > File "", line 1, in > File > "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/readwriter.py", > line 656, in text
[jira] [Created] (SPARK-22597) Add spark-sql script for Windows users
Hyukjin Kwon created SPARK-22597: Summary: Add spark-sql script for Windows users Key: SPARK-22597 URL: https://issues.apache.org/jira/browse/SPARK-22597 Project: Spark Issue Type: Improvement Components: SQL, Windows Affects Versions: 2.3.0 Reporter: Hyukjin Kwon Seems missing spark-sql.cmd. It'd be great if Windows users can use this functionality too. See - https://github.com/apache/spark/tree/master/bin Let me describe details in PR that I am going to open soon. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22597) Add spark-sql script for Windows users
[ https://issues.apache.org/jira/browse/SPARK-22597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22597: Assignee: (was: Apache Spark) > Add spark-sql script for Windows users > -- > > Key: SPARK-22597 > URL: https://issues.apache.org/jira/browse/SPARK-22597 > Project: Spark > Issue Type: Improvement > Components: SQL, Windows >Affects Versions: 2.3.0 >Reporter: Hyukjin Kwon > > Seems missing spark-sql.cmd. It'd be great if Windows users can use this > functionality too. > See - https://github.com/apache/spark/tree/master/bin > Let me describe details in PR that I am going to open soon. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22597) Add spark-sql script for Windows users
[ https://issues.apache.org/jira/browse/SPARK-22597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22597: Assignee: Apache Spark > Add spark-sql script for Windows users > -- > > Key: SPARK-22597 > URL: https://issues.apache.org/jira/browse/SPARK-22597 > Project: Spark > Issue Type: Improvement > Components: SQL, Windows >Affects Versions: 2.3.0 >Reporter: Hyukjin Kwon >Assignee: Apache Spark > > Seems missing spark-sql.cmd. It'd be great if Windows users can use this > functionality too. > See - https://github.com/apache/spark/tree/master/bin > Let me describe details in PR that I am going to open soon. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22597) Add spark-sql script for Windows users
[ https://issues.apache.org/jira/browse/SPARK-22597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264894#comment-16264894 ] Apache Spark commented on SPARK-22597: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/19808 > Add spark-sql script for Windows users > -- > > Key: SPARK-22597 > URL: https://issues.apache.org/jira/browse/SPARK-22597 > Project: Spark > Issue Type: Improvement > Components: SQL, Windows >Affects Versions: 2.3.0 >Reporter: Hyukjin Kwon > > Seems missing spark-sql.cmd. It'd be great if Windows users can use this > functionality too. > See - https://github.com/apache/spark/tree/master/bin > Let me describe details in PR that I am going to open soon. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-22561) Dynamically update topics list for spark kafka consumer
[ https://issues.apache.org/jira/browse/SPARK-22561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arun reopened SPARK-22561: -- Thanks [~c...@koeninger.org] - The SubscribePattern allows you to use a regex to specify topics of interest. Note that unlike the 0.8 integration, using Subscribe or SubscribePattern should respond to adding partitions during a running stream. I tested the SubscribePattern It is good in case of we don't want to pass list of topics - the spark streaming can load topic based on regex and start processing those topics. But my question is not related to loading topic based on pattern - "the question is once stream is materialized and running, I would like to add new topic on fly without restarting the job". > Dynamically update topics list for spark kafka consumer > --- > > Key: SPARK-22561 > URL: https://issues.apache.org/jira/browse/SPARK-22561 > Project: Spark > Issue Type: New Feature > Components: DStreams >Affects Versions: 2.1.0, 2.1.1, 2.2.0 >Reporter: Arun > > The Spark Streaming application should allow to add new topic after streaming > context is intialized and DStream is started. This is very useful feature > specially when business is working multi geography or multi business units. > For example initially I have spark-kakfa consumer listening for topics: > ["topic-1"."topic-2"] and after couple of days I have added new topics to > kafka ["topic-3","topic-4"], now is there a way to update spark-kafka > consumer topics list and ask spark-kafka consumer to consume data for updated > list of topics without stopping sparkStreaming application or sparkStreaming > context. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264904#comment-16264904 ] Saisai Shao commented on SPARK-2926: [~XuanYuan], would you please use spark-perf's micro benchmark (https://github.com/databricks/spark-perf) to verify again with same workload as mentioned in original test report? That would be more comparable. Theoretically this solution cannot get 12x-30x boosting according to my test, because this solution don't actually reduce the computation in logic, just moving part of comparison from reduce to map, which potentially reduces some cpu cycling and improves cache hit. Can you please explain the key difference and the reason of such boosting? Thanks! > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on > Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test > Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22526) Spark hangs while reading binary files from S3
[ https://issues.apache.org/jira/browse/SPARK-22526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264931#comment-16264931 ] mohamed imran commented on SPARK-22526: --- [~ste...@apache.org] Thanks. But i have referred some of the examples given in the spark git. None of the code says "Close the inputstream explicitly using sc.binaryfiles" Sample program from Git:- https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala So kindly update the spark document with the clear syntax. it will avoid wrong assumption/understanding between the developers. FYI..I haven't tested the code yet. I will test and keep you posted in sometime. > Spark hangs while reading binary files from S3 > -- > > Key: SPARK-22526 > URL: https://issues.apache.org/jira/browse/SPARK-22526 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: mohamed imran > Original Estimate: 168h > Remaining Estimate: 168h > > Hi, > I am using Spark 2.2.0(recent version) to read binary files from S3. I use > sc.binaryfiles to read the files. > It is working fine until some 100 file read but later it get hangs > indefinitely from 5 up to 40 mins like Avro file read issue(it was fixed in > the later releases) > I tried setting the fs.s3a.connection.maximum to some maximum values but > didn't help. > And finally i ended up using the spark speculation parameter set which is > again didnt help much. > One thing Which I observed is that it is not closing the connection after > every read of binary files from the S3. > example :- sc.binaryFiles("s3a://test/test123.zip") > Please look into this major issue! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17920) HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException in AvroSerde when using avro.schema.url
[ https://issues.apache.org/jira/browse/SPARK-17920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264965#comment-16264965 ] Apache Spark commented on SPARK-17920: -- User 'vinodkc' has created a pull request for this issue: https://github.com/apache/spark/pull/19809 > HiveWriterContainer passes null configuration to serde.initialize, causing > NullPointerException in AvroSerde when using avro.schema.url > --- > > Key: SPARK-17920 > URL: https://issues.apache.org/jira/browse/SPARK-17920 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2, 2.0.0 > Environment: AWS EMR 5.0.0: Spark 2.0.0, Hive 2.1.0 >Reporter: James Norvell >Assignee: Vinod KC >Priority: Minor > Fix For: 2.2.1, 2.3.0 > > Attachments: avro.avsc, avro_data > > > When HiveWriterContainer intializes a serde it explicitly passes null for the > Configuration: > https://github.com/apache/spark/blob/v2.0.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala#L161 > When attempting to write to a table stored as Avro with avro.schema.url set, > this causes a NullPointerException when it tries to get the FileSystem for > the URL: > https://github.com/apache/hive/blob/release-2.1.0-rc3/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java#L153 > Reproduction: > {noformat} > spark-sql> create external table avro_in (a string) stored as avro location > '/avro-in/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); > spark-sql> create external table avro_out (a string) stored as avro location > '/avro-out/' tblproperties ('avro.schema.url'='/avro-schema/avro.avsc'); > spark-sql> select * from avro_in; > hello > Time taken: 1.986 seconds, Fetched 1 row(s) > spark-sql> insert overwrite table avro_out select * from avro_in; > 16/10/13 19:34:47 WARN AvroSerDe: Encountered exception determining schema. > Returning signal schema to indicate problem > java.lang.NullPointerException > at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:359) > at > org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.getSchemaFromFS(AvroSerdeUtils.java:131) > at > org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:112) > at > org.apache.hadoop.hive.serde2.avro.AvroSerDe.determineSchemaOrReturnErrorSchema(AvroSerDe.java:167) > at > org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:103) > at > org.apache.spark.sql.hive.SparkHiveWriterContainer.newSerializer(hiveWriterContainers.scala:161) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:236) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142) > at > org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:313) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) > at org.apache.spark.sql.Dataset.(Dataset.scala:186) > at org.apache.spark.sql.Dataset.(Dataset.scala:167) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:331) > at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:247) > at > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) > a
[jira] [Commented] (SPARK-22587) Spark job fails if fs.defaultFS and application jar are different url
[ https://issues.apache.org/jira/browse/SPARK-22587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264986#comment-16264986 ] Saisai Shao commented on SPARK-22587: - [~Prabhu Joseph], I think it is because the logic of comparing two FS {{compareFs}} is not worked as expected for wasb, it identifies these two FSs as the same FS, but in fact they're two FSs. that's why the following {{makeQualified}} will throw an exception. > Spark job fails if fs.defaultFS and application jar are different url > - > > Key: SPARK-22587 > URL: https://issues.apache.org/jira/browse/SPARK-22587 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.6.3 >Reporter: Prabhu Joseph > > Spark Job fails if the fs.defaultFs and url where application jar resides are > different and having same scheme, > spark-submit --conf spark.master=yarn-cluster wasb://XXX/tmp/test.py > core-site.xml fs.defaultFS is set to wasb:///YYY. Hadoop list works (hadoop > fs -ls) works for both the url XXX and YYY. > {code} > Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: > wasb://XXX/tmp/test.py, expected: wasb://YYY > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665) > at > org.apache.hadoop.fs.azure.NativeAzureFileSystem.checkPath(NativeAzureFileSystem.java:1251) > > at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:485) > at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:396) > at > org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:507) > > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:660) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:912) > > at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:172) > at org.apache.spark.deploy.yarn.Client.run(Client.scala:1248) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1307) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:751) > > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > The code Client.copyFileToRemote tries to resolve the path of application jar > (XXX) from the FileSystem object created using fs.defaultFS url (YYY) instead > of the actual url of application jar. > val destFs = destDir.getFileSystem(hadoopConf) > val srcFs = srcPath.getFileSystem(hadoopConf) > getFileSystem will create the filesystem based on the url of the path and so > this is fine. But the below lines of code tries to get the srcPath (XXX url) > from the destFs (YYY url) and so it fails. > var destPath = srcPath > val qualifiedDestPath = destFs.makeQualified(destPath) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22598) ExecutorAllocationManager hang when executor fail and numExecutorsTarget has not change
Lijia Liu created SPARK-22598: - Summary: ExecutorAllocationManager hang when executor fail and numExecutorsTarget has not change Key: SPARK-22598 URL: https://issues.apache.org/jira/browse/SPARK-22598 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: Lijia Liu ExecutorAllocationManager call ExecutorAllocationClient.requestTotalExecutors to request new executors when target has not changed. But, when executor fail and target has not changed, the spark job will hang and no new executors will be requested. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22599) Avoid extra reading for cached table
Nan Zhu created SPARK-22599: --- Summary: Avoid extra reading for cached table Key: SPARK-22599 URL: https://issues.apache.org/jira/browse/SPARK-22599 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Nan Zhu In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as {code:scala} case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) {code} where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22599) Avoid extra reading for cached table
[ https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nan Zhu updated SPARK-22599: Description: In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as {code} case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) {code} where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing was: In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as {code:scala} case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) {code} where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing > Avoid extra reading for cached table > > > Key: SPARK-22599 > URL: https://issues.apache.org/jira/browse/SPARK-22599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Nan Zhu > > In the current implementation of Spark, InMemoryTableExec read all data in a > cached table, filter CachedBatch according to stats and pass data to the > downstream operators. This implementation makes it inefficient to reside the > whole table in memory to serve various queries against different partitions > of the table, which occupies a certain portion of our users' scenarios. > The following is an example of such a use case: > store_sales is a 1TB-sized table in cloud storage, which is part
[jira] [Assigned] (SPARK-22599) Avoid extra reading for cached table
[ https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22599: Assignee: Apache Spark > Avoid extra reading for cached table > > > Key: SPARK-22599 > URL: https://issues.apache.org/jira/browse/SPARK-22599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Nan Zhu >Assignee: Apache Spark > > In the current implementation of Spark, InMemoryTableExec read all data in a > cached table, filter CachedBatch according to stats and pass data to the > downstream operators. This implementation makes it inefficient to reside the > whole table in memory to serve various queries against different partitions > of the table, which occupies a certain portion of our users' scenarios. > The following is an example of such a use case: > store_sales is a 1TB-sized table in cloud storage, which is partitioned by > 'location'. The first query, Q1, wants to output several metrics A, B, C for > all stores in all locations. After that, a small team of 3 data scientists > wants to do some causal analysis for the sales in different locations. To > avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache > the whole table in memory in Q1. > With the current implementation, even any one of the data scientists is only > interested in one out of three locations, the queries they submit to Spark > cluster is still reading 1TB data completely. > The reason behind the extra reading operation is that we implement > CachedBatch as > {code} > case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: > InternalRow) > {code} > where "stats" is a part of every CachedBatch, so we can only filter batches > for output of InMemoryTableExec operator by reading all data in in-memory > table as input. The extra reading would be even more unacceptable when some > of the table's data is evicted to disks. > We propose to introduce a new type of block, metadata block, for the > partitions of RDD representing data in the cached table. Every metadata block > contains stats info for all columns in a partition and is saved to > BlockManager when executing compute() method for the partition. To minimize > the number of bytes to read, > More details can be found in design > doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22599) Avoid extra reading for cached table
[ https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265014#comment-16265014 ] Apache Spark commented on SPARK-22599: -- User 'CodingCat' has created a pull request for this issue: https://github.com/apache/spark/pull/19810 > Avoid extra reading for cached table > > > Key: SPARK-22599 > URL: https://issues.apache.org/jira/browse/SPARK-22599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Nan Zhu > > In the current implementation of Spark, InMemoryTableExec read all data in a > cached table, filter CachedBatch according to stats and pass data to the > downstream operators. This implementation makes it inefficient to reside the > whole table in memory to serve various queries against different partitions > of the table, which occupies a certain portion of our users' scenarios. > The following is an example of such a use case: > store_sales is a 1TB-sized table in cloud storage, which is partitioned by > 'location'. The first query, Q1, wants to output several metrics A, B, C for > all stores in all locations. After that, a small team of 3 data scientists > wants to do some causal analysis for the sales in different locations. To > avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache > the whole table in memory in Q1. > With the current implementation, even any one of the data scientists is only > interested in one out of three locations, the queries they submit to Spark > cluster is still reading 1TB data completely. > The reason behind the extra reading operation is that we implement > CachedBatch as > {code} > case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: > InternalRow) > {code} > where "stats" is a part of every CachedBatch, so we can only filter batches > for output of InMemoryTableExec operator by reading all data in in-memory > table as input. The extra reading would be even more unacceptable when some > of the table's data is evicted to disks. > We propose to introduce a new type of block, metadata block, for the > partitions of RDD representing data in the cached table. Every metadata block > contains stats info for all columns in a partition and is saved to > BlockManager when executing compute() method for the partition. To minimize > the number of bytes to read, > More details can be found in design > doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22599) Avoid extra reading for cached table
[ https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-22599: Assignee: (was: Apache Spark) > Avoid extra reading for cached table > > > Key: SPARK-22599 > URL: https://issues.apache.org/jira/browse/SPARK-22599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Nan Zhu > > In the current implementation of Spark, InMemoryTableExec read all data in a > cached table, filter CachedBatch according to stats and pass data to the > downstream operators. This implementation makes it inefficient to reside the > whole table in memory to serve various queries against different partitions > of the table, which occupies a certain portion of our users' scenarios. > The following is an example of such a use case: > store_sales is a 1TB-sized table in cloud storage, which is partitioned by > 'location'. The first query, Q1, wants to output several metrics A, B, C for > all stores in all locations. After that, a small team of 3 data scientists > wants to do some causal analysis for the sales in different locations. To > avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache > the whole table in memory in Q1. > With the current implementation, even any one of the data scientists is only > interested in one out of three locations, the queries they submit to Spark > cluster is still reading 1TB data completely. > The reason behind the extra reading operation is that we implement > CachedBatch as > {code} > case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: > InternalRow) > {code} > where "stats" is a part of every CachedBatch, so we can only filter batches > for output of InMemoryTableExec operator by reading all data in in-memory > table as input. The extra reading would be even more unacceptable when some > of the table's data is evicted to disks. > We propose to introduce a new type of block, metadata block, for the > partitions of RDD representing data in the cached table. Every metadata block > contains stats info for all columns in a partition and is saved to > BlockManager when executing compute() method for the partition. To minimize > the number of bytes to read, > More details can be found in design > doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265021#comment-16265021 ] Saisai Shao commented on SPARK-22579: - I think this issue should have already been fixed by SPARK-22062 and PR(https://github.com/apache/spark/pull/19476), what you need to do is to set a proper size for large blocks. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -- This message was sent by Atlassian JIRA (v6.4.1