[jira] [Commented] (SPARK-36240) Graceful termination of Spark Structured Streaming queries
[ https://issues.apache.org/jira/browse/SPARK-36240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17449458#comment-17449458 ] yufei ding commented on SPARK-36240: This feature is expected to be available in which version of Spark? > Graceful termination of Spark Structured Streaming queries > -- > > Key: SPARK-36240 > URL: https://issues.apache.org/jira/browse/SPARK-36240 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.1.2 > Environment: > >Reporter: Zoltán Zvara >Priority: Major > > Spark Streaming provides a way to gracefully stop the streaming application > using the configuration parameter > {{spark.streaming.stopGracefullyOnShutdown}}. The configuration states: > {quote}If {{true}}, Spark shuts down the {{StreamingContext}} gracefully on > JVM shutdown rather than immediately. > {quote} > This effectively stops the job generation (see {{JobGenerator}} of Spark > Streaming) and lets the current {{Job}} (corresponding to a micro-batch) be > finished instead of canceling the active job itself. > Some applications may require graceful stopping so that their output would > remain consistent - an output that is written out halfway poses a lot of > problems for applications that would require "exactly-once" semantics. > There is no support in Structured Streaming to gracefully stop > queries/streaming applications. > Naive solutions found on the web propose checking whether the queries are > active using {{query.isActive}} or checking query state directly and then > attempting to call {{stop()}} at the right time. In Structured Streaming, > with the current implementation, {{stop()}} cancels the job group that may > lead to inconsistent output, because it still depends on the timing of the > cancellation. > _Proposed solution:_ > Strictly speaking in the context of the micro-batch execution model, a > {{StreamingQuery}} that we want to gracefully stop would be of implementation > \{{MicroBatchExecution. }}The motivation is similar to that of the Streaming > Context's gracefulness: stop the "job generation" and then wait for any > active job to finish, instead of canceling the jobs. > The micro-batch scheduling is managed by a {{ProcessingTimeExecutor}} of the > {{MicroBatchExecution}} class. > > {code:java} > private val triggerExecutor = trigger match { > case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) > case OneTimeTrigger => OneTimeExecutor() > case _ => throw new IllegalStateException(s"Unknown type of trigger: > $trigger") > } > {code} > The following while-true is being run be the job generation mechanism. The > {{triggerHandler}} is a UDF that generates the micro-batches. > {code:java} > override def execute(triggerHandler: () => Boolean): Unit = { > while (true) { > val triggerTimeMs = clock.getTimeMillis > val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) > val terminated = !triggerHandler() > if (intervalMs > 0) { > val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs > if (batchElapsedTimeMs > intervalMs) { > notifyBatchFallingBehind(batchElapsedTimeMs) > } > if (terminated) { > return > } > clock.waitTillTime(nextTriggerTimeMs) > } else { > if (terminated) { > return > } > } > } > } > {code} > Here, upon a {{gracefulStop()}} signal from the queries could essentially > signal {{ProcessingTimeExecutor}} to stop triggering new batches. > Then another mechanism is required that would await until any current job is > finished. Then, it would call {{stop()}} and then the {{SparkSession}} may be > stopped as well. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path
[ https://issues.apache.org/jira/browse/SPARK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yusheng Ding updated SPARK-35299: - Description: To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) #Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. was: To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) ## Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. > Dataframe overwrite on S3 does not delete old files with S3 object-put to > table path > > > Key: SPARK-35299 > URL: https://issues.apache.org/jira/browse/SPARK-35299 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Yusheng Ding >Priority: Major > Labels: aws-s3, dataframe, hive, spark > > To reproduce: > test_table path: s3a://test_bucket/test_table/ > > df = spark_session.sql("SELECT * FROM test_table") > df.count() # produce row number 1000 > #S3 operation## > s3 = boto3.client("s3") > s3.put_object( > Bucket="test_bucket", Body="", Key=f"test_table/" > ) > #S3 operation## > df.write.insertInto(test_table, overwrite=True) > #Same goes to df.write.save(mode="overwrite", format="parquet", > path="s3a://test_bucket/test_table") > df = spark_session.sql("SELECT * FROM test_table") > df.count() # produce row number 2000 > > Overwrite is not functioning correctly. Old files will not be deleted on S3. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path
[ https://issues.apache.org/jira/browse/SPARK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yusheng Ding updated SPARK-35299: - Description: To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) ## Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. was: To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) # Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. > Dataframe overwrite on S3 does not delete old files with S3 object-put to > table path > > > Key: SPARK-35299 > URL: https://issues.apache.org/jira/browse/SPARK-35299 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Yusheng Ding >Priority: Major > Labels: aws-s3, dataframe, hive, spark > > To reproduce: > test_table path: s3a://test_bucket/test_table/ > > df = spark_session.sql("SELECT * FROM test_table") > df.count() # produce row number 1000 > #S3 operation## > s3 = boto3.client("s3") > s3.put_object( > Bucket="test_bucket", Body="", Key=f"test_table/" > ) > #S3 operation## > df.write.insertInto(test_table, overwrite=True) > ## Same goes to df.write.save(mode="overwrite", format="parquet", > path="s3a://test_bucket/test_table") > df = spark_session.sql("SELECT * FROM test_table") > df.count() # produce row number 2000 > > Overwrite is not functioning correctly. Old files will not be deleted on S3. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path
[ https://issues.apache.org/jira/browse/SPARK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yusheng Ding updated SPARK-35299: - Description: To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) ## Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. was: To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) ## Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. > Dataframe overwrite on S3 does not delete old files with S3 object-put to > table path > > > Key: SPARK-35299 > URL: https://issues.apache.org/jira/browse/SPARK-35299 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Yusheng Ding >Priority: Major > Labels: aws-s3, dataframe, hive, spark > > To reproduce: > test_table path: s3a://test_bucket/test_table/ > > df = spark_session.sql("SELECT * FROM test_table") > df.count() # produce row number 1000 > #S3 operation## > s3 = boto3.client("s3") > s3.put_object( > Bucket="test_bucket", Body="", Key=f"test_table/" > ) > #S3 operation## > df.write.insertInto(test_table, overwrite=True) > ## Same goes to df.write.save(mode="overwrite", format="parquet", > path="s3a://test_bucket/test_table") > df = spark_session.sql("SELECT * FROM test_table") > df.count() # produce row number 2000 > > Overwrite is not functioning correctly. Old files will not be deleted on S3. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path
[ https://issues.apache.org/jira/browse/SPARK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yusheng Ding updated SPARK-35299: - Description: To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) # Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. was: To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) # Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. > Dataframe overwrite on S3 does not delete old files with S3 object-put to > table path > > > Key: SPARK-35299 > URL: https://issues.apache.org/jira/browse/SPARK-35299 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Yusheng Ding >Priority: Major > Labels: aws-s3, dataframe, hive, spark > > To reproduce: > test_table path: s3a://test_bucket/test_table/ > > df = spark_session.sql("SELECT * FROM test_table") > df.count() # produce row number 1000 > #S3 operation## > s3 = boto3.client("s3") > s3.put_object( > Bucket="test_bucket", Body="", Key=f"test_table/" > ) > #S3 operation## > df.write.insertInto(test_table, overwrite=True) > # Same goes to df.write.save(mode="overwrite", format="parquet", > path="s3a://test_bucket/test_table") > df = spark_session.sql("SELECT * FROM test_table") > df.count() # produce row number 2000 > > Overwrite is not functioning correctly. Old files will not be deleted on S3. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path
Yusheng Ding created SPARK-35299: Summary: Dataframe overwrite on S3 does not delete old files with S3 object-put to table path Key: SPARK-35299 URL: https://issues.apache.org/jira/browse/SPARK-35299 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: Yusheng Ding To reproduce: test_table path: s3a://test_bucket/test_table/ df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 1000 #S3 operation## s3 = boto3.client("s3") s3.put_object( Bucket="test_bucket", Body="", Key=f"test_table/" ) #S3 operation## df.write.insertInto(test_table, overwrite=True) # Same goes to df.write.save(mode="overwrite", format="parquet", path="s3a://test_bucket/test_table") df = spark_session.sql("SELECT * FROM test_table") df.count() # produce row number 2000 Overwrite is not functioning correctly. Old files will not be deleted on S3. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31933) Found another deadlock in Spark Driver
[ https://issues.apache.org/jira/browse/SPARK-31933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenning Ding updated SPARK-31933: - Description: This is another Spark java-level deadlock we found: it looks similar to https://issues.apache.org/jira/browse/SPARK-26961, but I think actually they are different issues. Looks like this deadlock is caused from FsUrlStreamHandlerFactory. One straightforward way to fix this is to change FsUrlStreamHandlerFactory.java#L74 from {{FileSystem.getFileSystemClass(protocol, conf);}} to {{FileSystem.getFileSystemClass(protocol, new Configuration(conf));}} But not sure if this is acceptable from Hadoop side. Not sure if there's better way to solve this from Spark side. {code:java} "SparkUI-60": waiting to lock monitor 0x7f511ca22728 (object 0x00068c6e9060, a org.apache.hadoop.conf.Configuration), which is held by "Driver" "Driver": waiting to lock monitor 0x7f511c4fe448 (object 0x000400079600, a java.util.HashMap), which is held by "SparkUI-60" {code} {code:java} "SparkUI-60": at org.apache.hadoop.conf.Configuration.getOverlay(Configuration.java:1328) - waiting to lock <0x00068c6e9060> (a org.apache.hadoop.conf.Configuration) at org.apache.hadoop.conf.Configuration.handleDeprecation(Configuration.java:684) at org.apache.hadoop.conf.Configuration.get(Configuration.java:1088) at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1145) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2363) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840) at org.apache.hadoop.fs.FsUrlStreamHandlerFactory.createURLStreamHandler(FsUrlStreamHandlerFactory.java:74) at java.net.URL.getURLStreamHandler(URL.java:1165) at java.net.URL.(URL.java:422) at java.net.URL.(URL.java:312) at java.net.URL.(URL.java:335) at sun.net.www.ParseUtil.fileToEncodedURL(ParseUtil.java:272) at java.lang.Package$1.run(Package.java:579) at java.lang.Package$1.run(Package.java:570) at java.security.AccessController.doPrivileged(Native Method) at java.lang.Package.defineSystemPackage(Package.java:570) at java.lang.Package.getSystemPackage(Package.java:546) - locked <0x000400079600> (a java.util.HashMap) at java.lang.ClassLoader.getPackage(ClassLoader.java:1630) at java.lang.ClassLoader.getPackage(ClassLoader.java:1628) at java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:394) at java.net.URLClassLoader.definePackageInternal(URLClassLoader.java:420) at java.net.URLClassLoader.defineClass(URLClassLoader.java:452) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) - locked <0x00068dd20910> (a java.lang.Object) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at org.glassfish.jersey.model.internal.ComponentBag.modelFor(ComponentBag.java:483) at org.glassfish.jersey.model.internal.ComponentBag.access$100(ComponentBag.java:89) at org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:408) at org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:398) at org.glassfish.jersey.internal.Errors.process(Errors.java:315) at org.glassfish.jersey.internal.Errors.process(Errors.java:297) at org.glassfish.jersey.internal.Errors.process(Errors.java:228) at org.glassfish.jersey.model.internal.ComponentBag.registerModel(ComponentBag.java:398) at org.glassfish.jersey.model.internal.ComponentBag.register(ComponentBag.java:235) at org.glassfish.jersey.model.internal.CommonConfig.register(CommonConfig.java:420) at org.glassfish.jersey.server.ResourceConfig.register(ResourceConfig.java:425) at org.glassfish.jersey.server.ResourceConfig.registerClasses(ResourceConfig.java:501) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1212) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1178) at org.glassfish.jersey.server.ResourceConfig.createRuntimeConfig(ResourceConfig.java:1174) at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:345) at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392) at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177) at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369) at javax.servlet.GenericServlet.init(GenericServlet.java:244) at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643) at
[jira] [Updated] (SPARK-31933) Found another deadlock in Spark Driver
[ https://issues.apache.org/jira/browse/SPARK-31933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenning Ding updated SPARK-31933: - Description: This is another Spark java-level deadlock we found: it looks similar to https://issues.apache.org/jira/browse/SPARK-26961, but I think actually they are different issues. {code:java} "SparkUI-60": waiting to lock monitor 0x7f511ca22728 (object 0x00068c6e9060, a org.apache.hadoop.conf.Configuration), which is held by "Driver" "Driver": waiting to lock monitor 0x7f511c4fe448 (object 0x000400079600, a java.util.HashMap), which is held by "SparkUI-60" {code} {code:java} "SparkUI-60": at org.apache.hadoop.conf.Configuration.getOverlay(Configuration.java:1328) - waiting to lock <0x00068c6e9060> (a org.apache.hadoop.conf.Configuration) at org.apache.hadoop.conf.Configuration.handleDeprecation(Configuration.java:684) at org.apache.hadoop.conf.Configuration.get(Configuration.java:1088) at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1145) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2363) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840) at org.apache.hadoop.fs.FsUrlStreamHandlerFactory.createURLStreamHandler(FsUrlStreamHandlerFactory.java:74) at java.net.URL.getURLStreamHandler(URL.java:1165) at java.net.URL.(URL.java:422) at java.net.URL.(URL.java:312) at java.net.URL.(URL.java:335) at sun.net.www.ParseUtil.fileToEncodedURL(ParseUtil.java:272) at java.lang.Package$1.run(Package.java:579) at java.lang.Package$1.run(Package.java:570) at java.security.AccessController.doPrivileged(Native Method) at java.lang.Package.defineSystemPackage(Package.java:570) at java.lang.Package.getSystemPackage(Package.java:546) - locked <0x000400079600> (a java.util.HashMap) at java.lang.ClassLoader.getPackage(ClassLoader.java:1630) at java.lang.ClassLoader.getPackage(ClassLoader.java:1628) at java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:394) at java.net.URLClassLoader.definePackageInternal(URLClassLoader.java:420) at java.net.URLClassLoader.defineClass(URLClassLoader.java:452) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) - locked <0x00068dd20910> (a java.lang.Object) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at org.glassfish.jersey.model.internal.ComponentBag.modelFor(ComponentBag.java:483) at org.glassfish.jersey.model.internal.ComponentBag.access$100(ComponentBag.java:89) at org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:408) at org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:398) at org.glassfish.jersey.internal.Errors.process(Errors.java:315) at org.glassfish.jersey.internal.Errors.process(Errors.java:297) at org.glassfish.jersey.internal.Errors.process(Errors.java:228) at org.glassfish.jersey.model.internal.ComponentBag.registerModel(ComponentBag.java:398) at org.glassfish.jersey.model.internal.ComponentBag.register(ComponentBag.java:235) at org.glassfish.jersey.model.internal.CommonConfig.register(CommonConfig.java:420) at org.glassfish.jersey.server.ResourceConfig.register(ResourceConfig.java:425) at org.glassfish.jersey.server.ResourceConfig.registerClasses(ResourceConfig.java:501) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1212) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1178) at org.glassfish.jersey.server.ResourceConfig.createRuntimeConfig(ResourceConfig.java:1174) at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:345) at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392) at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177) at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369) at javax.servlet.GenericServlet.init(GenericServlet.java:244) at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643) at org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499) - locked <0x000407a5a840> (a org.spark_project.jetty.servlet.ServletHolder) at org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791) - locked <0x000407a5a840> (a org.spark_project.jetty.servlet.ServletHolder) at org.spark_project.jetty.servlet.ServletHolder.prepare(ServletHolder.java:776) at
[jira] [Updated] (SPARK-31933) Found another deadlock in Spark Driver
[ https://issues.apache.org/jira/browse/SPARK-31933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenning Ding updated SPARK-31933: - Description: This is another Spark java-level deadlock we found: it looks similar to https://issues.apache.org/jira/browse/SPARK-26961, but I think actually they are different issues. {code:java} "SparkUI-60": waiting to lock monitor 0x7f511ca22728 (object 0x00068c6e9060, a org.apache.hadoop.conf.Configuration), which is held by "Driver" "Driver": waiting to lock monitor 0x7f511c4fe448 (object 0x000400079600, a java.util.HashMap), which is held by "SparkUI-60" {code} {code:java} "SparkUI-60":"SparkUI-60": at org.apache.hadoop.conf.Configuration.getOverlay(Configuration.java:1328) - waiting to lock <0x00068c6e9060> (a org.apache.hadoop.conf.Configuration) at org.apache.hadoop.conf.Configuration.handleDeprecation(Configuration.java:684) at org.apache.hadoop.conf.Configuration.get(Configuration.java:1088) at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1145) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2363) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840) at org.apache.hadoop.fs.FsUrlStreamHandlerFactory.createURLStreamHandler(FsUrlStreamHandlerFactory.java:74) at java.net.URL.getURLStreamHandler(URL.java:1165) at java.net.URL.(URL.java:422) at java.net.URL.(URL.java:312) at java.net.URL.(URL.java:335) at sun.net.www.ParseUtil.fileToEncodedURL(ParseUtil.java:272) at java.lang.Package$1.run(Package.java:579) at java.lang.Package$1.run(Package.java:570) at java.security.AccessController.doPrivileged(Native Method) at java.lang.Package.defineSystemPackage(Package.java:570) at java.lang.Package.getSystemPackage(Package.java:546) - locked <0x000400079600> (a java.util.HashMap) at java.lang.ClassLoader.getPackage(ClassLoader.java:1630) at java.lang.ClassLoader.getPackage(ClassLoader.java:1628) at java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:394) at java.net.URLClassLoader.definePackageInternal(URLClassLoader.java:420) at java.net.URLClassLoader.defineClass(URLClassLoader.java:452) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) - locked <0x00068dd20910> (a java.lang.Object) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at org.glassfish.jersey.model.internal.ComponentBag.modelFor(ComponentBag.java:483) at org.glassfish.jersey.model.internal.ComponentBag.access$100(ComponentBag.java:89) at org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:408) at org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:398) at org.glassfish.jersey.internal.Errors.process(Errors.java:315) at org.glassfish.jersey.internal.Errors.process(Errors.java:297) at org.glassfish.jersey.internal.Errors.process(Errors.java:228) at org.glassfish.jersey.model.internal.ComponentBag.registerModel(ComponentBag.java:398) at org.glassfish.jersey.model.internal.ComponentBag.register(ComponentBag.java:235) at org.glassfish.jersey.model.internal.CommonConfig.register(CommonConfig.java:420) at org.glassfish.jersey.server.ResourceConfig.register(ResourceConfig.java:425) at org.glassfish.jersey.server.ResourceConfig.registerClasses(ResourceConfig.java:501) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1212) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1178) at org.glassfish.jersey.server.ResourceConfig.createRuntimeConfig(ResourceConfig.java:1174) at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:345) at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392) at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177) at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369) at javax.servlet.GenericServlet.init(GenericServlet.java:244) at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643) at org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499) - locked <0x000407a5a840> (a org.spark_project.jetty.servlet.ServletHolder) at org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791) - locked <0x000407a5a840> (a org.spark_project.jetty.servlet.ServletHolder) at org.spark_project.jetty.servlet.ServletHolder.prepare(ServletHolder.java:776) at
[jira] [Created] (SPARK-31933) Found another deadlock in Spark Driver
Wenning Ding created SPARK-31933: Summary: Found another deadlock in Spark Driver Key: SPARK-31933 URL: https://issues.apache.org/jira/browse/SPARK-31933 Project: Spark Issue Type: Bug Components: Spark Submit Affects Versions: 2.4.6, 3.1.0 Reporter: Wenning Ding This is another Spark java-level deadlock we found: it looks similar to https://issues.apache.org/jira/browse/SPARK-26961, but I think actually they are different issues. {code:java} "SparkUI-60": waiting to lock monitor 0x7f511ca22728 (object 0x00068c6e9060, a org.apache.hadoop.conf.Configuration), which is held by "Driver" "Driver": waiting to lock monitor 0x7f511c4fe448 (object 0x000400079600, a java.util.HashMap), which is held by "SparkUI-60" {code} {code:java} "SparkUI-60": at org.apache.hadoop.conf.Configuration.getOverlay(Configuration.java:1328) - waiting to lock <0x00068c6e9060> (a org.apache.hadoop.conf.Configuration) at org.apache.hadoop.conf.Configuration.handleDeprecation(Configuration.java:684) at org.apache.hadoop.conf.Configuration.get(Configuration.java:1088) at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1145) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2363) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840) at org.apache.hadoop.fs.FsUrlStreamHandlerFactory.createURLStreamHandler(FsUrlStreamHandlerFactory.java:74) at java.net.URL.getURLStreamHandler(URL.java:1165) at java.net.URL.(URL.java:422) at java.net.URL.(URL.java:312) at java.net.URL.(URL.java:335) at sun.net.www.ParseUtil.fileToEncodedURL(ParseUtil.java:272) at java.lang.Package$1.run(Package.java:579) at java.lang.Package$1.run(Package.java:570) at java.security.AccessController.doPrivileged(Native Method) at java.lang.Package.defineSystemPackage(Package.java:570) at java.lang.Package.getSystemPackage(Package.java:546) - locked <0x000400079600> (a java.util.HashMap) at java.lang.ClassLoader.getPackage(ClassLoader.java:1630) at java.lang.ClassLoader.getPackage(ClassLoader.java:1628) at java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:394) at java.net.URLClassLoader.definePackageInternal(URLClassLoader.java:420) at java.net.URLClassLoader.defineClass(URLClassLoader.java:452) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) - locked <0x00068dd20910> (a java.lang.Object) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at org.glassfish.jersey.model.internal.ComponentBag.modelFor(ComponentBag.java:483) at org.glassfish.jersey.model.internal.ComponentBag.access$100(ComponentBag.java:89) at org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:408) at org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:398) at org.glassfish.jersey.internal.Errors.process(Errors.java:315) at org.glassfish.jersey.internal.Errors.process(Errors.java:297) at org.glassfish.jersey.internal.Errors.process(Errors.java:228) at org.glassfish.jersey.model.internal.ComponentBag.registerModel(ComponentBag.java:398) at org.glassfish.jersey.model.internal.ComponentBag.register(ComponentBag.java:235) at org.glassfish.jersey.model.internal.CommonConfig.register(CommonConfig.java:420) at org.glassfish.jersey.server.ResourceConfig.register(ResourceConfig.java:425) at org.glassfish.jersey.server.ResourceConfig.registerClasses(ResourceConfig.java:501) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1212) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1178) at org.glassfish.jersey.server.ResourceConfig.createRuntimeConfig(ResourceConfig.java:1174) at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:345) at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392) at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177) at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369) at javax.servlet.GenericServlet.init(GenericServlet.java:244) at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643) at org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499) - locked <0x000407a5a840> (a org.spark_project.jetty.servlet.ServletHolder) at org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791) - locked <0x000407a5a840> (a
[jira] [Commented] (SPARK-29868) Add a benchmark for Adaptive Execution
[ https://issues.apache.org/jira/browse/SPARK-29868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16972974#comment-16972974 ] leonard ding commented on SPARK-29868: -- I will pick it > Add a benchmark for Adaptive Execution > -- > > Key: SPARK-29868 > URL: https://issues.apache.org/jira/browse/SPARK-29868 > Project: Spark > Issue Type: Test > Components: SQL, Tests >Affects Versions: 3.0.0 >Reporter: Yuming Wang >Priority: Major > Attachments: BroadcastJoin.jpeg, SortMergeJoin.jpeg > > > Add a benchmark for Adaptive Execution to evaluate SortMergeJoin to > BroadcastJoin performance. > It seem SortMergeJoin faster than BroadcastJoin if one side is a bucketed > table and can convert to BroadcastJoin. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6
[ https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15578987#comment-15578987 ] ding edited comment on SPARK-17951 at 10/16/16 12:22 AM: - I tried to call rdd.collect which internally called bm.getRemoteBytes in multiple threads. The data shows difference of time spend on block fetch is also around 0.1s between spark 1.5.1 and spark 1.6.2. And it can be consistently repro. In our scenario, we called getRemoteBytes twice. If we use spark 1.6.2 it will bring additional 0.2s overhead and cause around 5 decrease of throughput. we would like to reduce the overhead if possible. was (Author: ding): I tried to call rdd.collect which internally called bm.getRemoteBytes in multiple threads. The data shows difference of time spend on block fetch is also around 0.1s between spark 1.5.1 and spark 1.6.2. And it can be consistently repro. In our scenario, we called getRemoteBytes twice. If we use spark 1.6.2 it will bring additional 0.2s overhead and cause around 5 decrease of throughput. we would like to reduce the overhead as much as possible. > BlockFetch with multiple threads slows down after spark 1.6 > --- > > Key: SPARK-17951 > URL: https://issues.apache.org/jira/browse/SPARK-17951 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 1.6.2 > Environment: cluster with 8 node, each node has 28 cores. 10Gb network >Reporter: ding > > The following code demonstrates the issue: > {code} > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName(s"BMTest") > val size = 3344570 > val sc = new SparkContext(conf) > val data = sc.parallelize(1 to 100, 8) > var accum = sc.accumulator(0.0, "get remote bytes") > var i = 0 > while(i < 91) { > accum = sc.accumulator(0.0, "get remote bytes") > val test = data.mapPartitionsWithIndex { (pid, iter) => > val N = size > val bm = SparkEnv.get.blockManager > val blockId = TaskResultBlockId(10*i + pid) > val test = new Array[Byte](N) > Random.nextBytes(test) > val buffer = ByteBuffer.allocate(N) > buffer.limit(N) > buffer.put(test) > bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) > Iterator(1) > }.count() > > data.mapPartitionsWithIndex { (pid, iter) => > val before = System.nanoTime() > > val bm = SparkEnv.get.blockManager > (0 to 7).map(s => { > Future { > val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) > } > }).map(Await.result(_, Duration.Inf)) > > accum.add((System.nanoTime() - before) / 1e9) > Iterator(1) > }.count() > println("get remote bytes take: " + accum.value/8) > i += 1 > } > } > {code} > In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while > in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s > However if fetch block in single thread, the gap is much smaller. > spark1.6.2 get remote bytes: 0.21 s > spark1.5.1 get remote bytes: 0.20 s -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6
[ https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15578987#comment-15578987 ] ding commented on SPARK-17951: -- I tried to call rdd.collect which internally called bm.getRemoteBytes in multiple threads. The data shows difference of time spend on block fetch is also around 0.1s between spark 1.5.1 and spark 1.6.2. And it can be consistently repro. In our scenario, we called getRemoteBytes twice. If we use spark 1.6.2 it will bring additional 0.2s overhead and cause around 5 decrease of throughput. we would like to reduce the overhead as much as possible. > BlockFetch with multiple threads slows down after spark 1.6 > --- > > Key: SPARK-17951 > URL: https://issues.apache.org/jira/browse/SPARK-17951 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 1.6.2 > Environment: cluster with 8 node, each node has 28 cores. 10Gb network >Reporter: ding > > The following code demonstrates the issue: > {code} > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName(s"BMTest") > val size = 3344570 > val sc = new SparkContext(conf) > val data = sc.parallelize(1 to 100, 8) > var accum = sc.accumulator(0.0, "get remote bytes") > var i = 0 > while(i < 91) { > accum = sc.accumulator(0.0, "get remote bytes") > val test = data.mapPartitionsWithIndex { (pid, iter) => > val N = size > val bm = SparkEnv.get.blockManager > val blockId = TaskResultBlockId(10*i + pid) > val test = new Array[Byte](N) > Random.nextBytes(test) > val buffer = ByteBuffer.allocate(N) > buffer.limit(N) > buffer.put(test) > bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) > Iterator(1) > }.count() > > data.mapPartitionsWithIndex { (pid, iter) => > val before = System.nanoTime() > > val bm = SparkEnv.get.blockManager > (0 to 7).map(s => { > Future { > val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) > } > }).map(Await.result(_, Duration.Inf)) > > accum.add((System.nanoTime() - before) / 1e9) > Iterator(1) > }.count() > println("get remote bytes take: " + accum.value/8) > i += 1 > } > } > {code} > In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while > in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s > However if fetch block in single thread, the gap is much smaller. > spark1.6.2 get remote bytes: 0.21 s > spark1.5.1 get remote bytes: 0.20 s -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6
[ https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ding updated SPARK-17951: - Description: The following code demonstrates the issue: def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(s"BMTest") val size = 3344570 val sc = new SparkContext(conf) val data = sc.parallelize(1 to 100, 8) var accum = sc.accumulator(0.0, "get remote bytes") var i = 0 while(i < 91) { accum = sc.accumulator(0.0, "get remote bytes") val test = data.mapPartitionsWithIndex { (pid, iter) => val N = size val bm = SparkEnv.get.blockManager val blockId = TaskResultBlockId(10*i + pid) val test = new Array[Byte](N) Random.nextBytes(test) val buffer = ByteBuffer.allocate(N) buffer.limit(N) buffer.put(test) bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) Iterator(1) }.count() data.mapPartitionsWithIndex { (pid, iter) => val before = System.nanoTime() val bm = SparkEnv.get.blockManager (0 to 7).map(s => { Future { val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) } }).map(Await.result(_, Duration.Inf)) accum.add((System.nanoTime() - before) / 1e9) Iterator(1) }.count() println("get remote bytes take: " + accum.value/8) i += 1 } } In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s However if fetch block in single thread, the gap is much smaller. spark1.6.2 get remote bytes: 0.21 s spark1.5.1 get remote bytes: 0.20 s was: The following code demonstrates the issue: def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(s"BMTest") val size = 3344570 val sc = new SparkContext(conf) val data = sc.parallelize(1 to 100, 8) var accum = sc.accumulator(0.0, "get remote bytes") var i = 0 while(i < 91) { accum = sc.accumulator(0.0, "get remote bytes") val test = data.mapPartitionsWithIndex { (pid, iter) => val N = size val bm = SparkEnv.get.blockManager val blockId = TaskResultBlockId(10*i + pid) val test = new Array[Byte](N) Random.nextBytes(test) val buffer = ByteBuffer.allocate(N) buffer.limit(N) buffer.put(test) bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) Iterator(1) }.count() data.mapPartitionsWithIndex { (pid, iter) => val before = System.nanoTime() val bm = SparkEnv.get.blockManager (1 to 8).map(s => { Future { val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) } }).map(Await.result(_, Duration.Inf)) accum.add((System.nanoTime() - before) / 1e9) Iterator(1) }.count() println("get remote bytes take: " + accum.value/8) i += 1 } } In spark1.6.2, average of "getting remote bytes" time is: 0.16s while in spark 1.5.1 average of "getting remote bytes" time is: 0.07s However if fetch block in single thread, the gap is much smaller. spark1.6.2 get remote bytes: 0.191421s spark1.5.1 get remote bytes: 0.181312s > BlockFetch with multiple threads slows down after spark 1.6 > --- > > Key: SPARK-17951 > URL: https://issues.apache.org/jira/browse/SPARK-17951 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 1.6.2 > Environment: cluster with 8 node, each node has 28 cores. 10Gb network >Reporter: ding > > The following code demonstrates the issue: > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName(s"BMTest") > val size = 3344570 > val sc = new SparkContext(conf) > val data = sc.parallelize(1 to 100, 8) > var accum = sc.accumulator(0.0, "get remote bytes") > var i = 0 > while(i < 91) { > accum = sc.accumulator(0.0, "get remote bytes") > val test = data.mapPartitionsWithIndex { (pid, iter) => > val N = size > val bm = SparkEnv.get.blockManager > val blockId = TaskResultBlockId(10*i + pid) > val test = new Array[Byte](N) > Random.nextBytes(test) > val buffer = ByteBuffer.allocate(N) > buffer.limit(N) > buffer.put(test) > bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) > Iterator(1) > }.count() > > data.mapPartitionsWithIndex { (pid, iter) => > val before = System.nanoTime() > > val bm = SparkEnv.get.blockManager
[jira] [Created] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6
ding created SPARK-17951: Summary: BlockFetch with multiple threads slows down after spark 1.6 Key: SPARK-17951 URL: https://issues.apache.org/jira/browse/SPARK-17951 Project: Spark Issue Type: Bug Components: Block Manager, Spark Core Affects Versions: 1.6.2 Environment: cluster with 8 node, each node has 28 cores. 10Gb network Reporter: ding The following code demonstrates the issue: def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(s"BMTest") val size = 3344570 val sc = new SparkContext(conf) val data = sc.parallelize(1 to 100, 8) var accum = sc.accumulator(0.0, "get remote bytes") var i = 0 while(i < 91) { accum = sc.accumulator(0.0, "get remote bytes") val test = data.mapPartitionsWithIndex { (pid, iter) => val N = size val bm = SparkEnv.get.blockManager val blockId = TaskResultBlockId(10*i + pid) val test = new Array[Byte](N) Random.nextBytes(test) val buffer = ByteBuffer.allocate(N) buffer.limit(N) buffer.put(test) bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) Iterator(1) }.count() data.mapPartitionsWithIndex { (pid, iter) => val before = System.nanoTime() val bm = SparkEnv.get.blockManager (1 to 8).map(s => { Future { val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) } }).map(Await.result(_, Duration.Inf)) accum.add((System.nanoTime() - before) / 1e9) Iterator(1) }.count() println("get remote bytes take: " + accum.value/8) i += 1 } } In spark1.6.2, average of "getting remote bytes" time is: 0.16s while in spark 1.5.1 average of "getting remote bytes" time is: 0.07s However if fetch block in single thread, the gap is much smaller. spark1.6.2 get remote bytes: 0.191421s spark1.5.1 get remote bytes: 0.181312s -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17559) PeriodicGraphCheckpointer did not persist edges as expected in some cases
[ https://issues.apache.org/jira/browse/SPARK-17559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15545991#comment-15545991 ] ding commented on SPARK-17559: -- Yes, it's the correct username. Thank you for your reviewing. > PeriodicGraphCheckpointer did not persist edges as expected in some cases > - > > Key: SPARK-17559 > URL: https://issues.apache.org/jira/browse/SPARK-17559 > Project: Spark > Issue Type: Bug > Components: MLlib >Reporter: ding >Assignee: dingding >Priority: Minor > Fix For: 2.0.2, 2.1.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > When use PeriodicGraphCheckpointer to persist graph, sometimes the edge isn't > persisted. As currently only when vertices's storage level is none, graph is > persisted. However there is a chance vertices's storage level is not none > while edges's is none. Eg. graph created by a outerJoinVertices operation, > vertices is automatically cached while edges is not. In this way, edges will > not be persisted if we use PeriodicGraphCheckpointer do persist. > See below minimum example: >val graphCheckpointer = new PeriodicGraphCheckpointer[Array[String], > Int](2, sc) > val users = sc.textFile("data/graphx/users.txt") > .map(line => line.split(",")).map(parts => (parts.head.toLong, > parts.tail)) > val followerGraph = GraphLoader.edgeListFile(sc, > "data/graphx/followers.txt") > val graph = followerGraph.outerJoinVertices(users) { > case (uid, deg, Some(attrList)) => attrList > case (uid, deg, None) => Array.empty[String] > } > graphCheckpointer.update(graph) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17097) Pregel does not keep vertex state properly; fails to terminate
[ https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533976#comment-15533976 ] ding commented on SPARK-17097: -- Because diff of case class behaves different with regular class. A case class implements the equals method while a class does not. When comparing two objects implemented as a class is actually comparing the memory address of the objects. In above code, if we remove "case", after transform, the original vertices is still different with the new generated vertices although they have the same value. In this way, EdgeTriplet is able to be updated since there is difference and after 1 iteration there will be no active message and the application will terminate. > Pregel does not keep vertex state properly; fails to terminate > --- > > Key: SPARK-17097 > URL: https://issues.apache.org/jira/browse/SPARK-17097 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 1.6.0 > Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel >Reporter: Seth Bromberger > > Consider the following minimum example: > {code:title=PregelBug.scala|borderStyle=solid} > package testGraph > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _} > object PregelBug { > def main(args: Array[String]) = { > //FIXME breaks if TestVertex is a case class; works if not case class > case class TestVertex(inId: VertexId, > inData: String, > inLabels: collection.mutable.HashSet[String]) extends > Serializable { > val id = inId > val value = inData > val labels = inLabels > } > class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends > Serializable { > val src = inSrc > val dst = inDst > val data = inData > } > val startString = "XXXSTARTXXX" > val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]") > val sc = new SparkContext(conf) > val vertexes = Vector( > new TestVertex(0, "label0", collection.mutable.HashSet[String]()), > new TestVertex(1, "label1", collection.mutable.HashSet[String]()) > ) > val links = Vector( > new TestLink(0, 1, "linkData01") > ) > val vertexes_packaged = vertexes.map(v => (v.id, v)) > val links_packaged = links.map(e => Edge(e.src, e.dst, e)) > val graph = Graph[TestVertex, > TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged)) > def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: > Vector[String]): TestVertex = { > message.foreach { > case `startString` => > if (vdata.id == 0L) > vdata.labels.add(vdata.value) > case m => > if (!vdata.labels.contains(m)) > vdata.labels.add(m) > } > new TestVertex(vdata.id, vdata.value, vdata.labels) > } > def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): > Iterator[(VertexId, Vector[String])] = { > val srcLabels = triplet.srcAttr.labels > val dstLabels = triplet.dstAttr.labels > val msgsSrcDst = srcLabels.diff(dstLabels) > .map(label => (triplet.dstAttr.id, Vector[String](label))) > val msgsDstSrc = dstLabels.diff(dstLabels) > .map(label => (triplet.srcAttr.id, Vector[String](label))) > msgsSrcDst.toIterator ++ msgsDstSrc.toIterator > } > def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] > = m1.union(m2).distinct > val g = graph.pregel(Vector[String](startString))(vertexProgram, > sendMessage, mergeMessage) > println("---pregel done---") > println("vertex info:") > g.vertices.foreach( > v => { > val labels = v._2.labels > println( > "vertex " + v._1 + > ": name = " + v._2.id + > ", labels = " + labels) > } > ) > } > } > {code} > This code never terminates even though we expect it to. To fix, we simply > remove the "case" designation for the TestVertex class (see FIXME comment), > and then it behaves as expected. > (Apologies if this has been fixed in later versions; we're unfortunately > pegged to 2.10.5 / 1.6.0 for now.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5484) Pregel should checkpoint periodically to avoid StackOverflowError
[ https://issues.apache.org/jira/browse/SPARK-5484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15496857#comment-15496857 ] ding commented on SPARK-5484: - Thank you for your kindly reminder. However as the code is almost ready, I will still send PR in case someone has interest to review it. > Pregel should checkpoint periodically to avoid StackOverflowError > - > > Key: SPARK-5484 > URL: https://issues.apache.org/jira/browse/SPARK-5484 > Project: Spark > Issue Type: Bug > Components: GraphX >Reporter: Ankur Dave >Assignee: Ankur Dave > > Pregel-based iterative algorithms with more than ~50 iterations begin to slow > down and eventually fail with a StackOverflowError due to Spark's lack of > support for long lineage chains. Instead, Pregel should checkpoint the graph > periodically. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17559) PeriodicGraphCheckpointer didnot persist edges as expected in some cases
ding created SPARK-17559: Summary: PeriodicGraphCheckpointer didnot persist edges as expected in some cases Key: SPARK-17559 URL: https://issues.apache.org/jira/browse/SPARK-17559 Project: Spark Issue Type: Bug Components: MLlib Reporter: ding Priority: Minor When use PeriodicGraphCheckpointer to persist graph, sometimes the edge isn't persisted. As currently only when vertices's storage level is none, graph is persisted. However there is a chance vertices's storage level is not none while edges's is none. Eg. graph created by a outerJoinVertices operation, vertices is automatically cached while edges is not. In this way, edges will not be persisted if we use PeriodicGraphCheckpointer do persist. See below minimum example: val graphCheckpointer = new PeriodicGraphCheckpointer[Array[String], Int](2, sc) val users = sc.textFile("data/graphx/users.txt") .map(line => line.split(",")).map(parts => (parts.head.toLong, parts.tail)) val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") val graph = followerGraph.outerJoinVertices(users) { case (uid, deg, Some(attrList)) => attrList case (uid, deg, None) => Array.empty[String] } graphCheckpointer.update(graph) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5484) Pregel should checkpoint periodically to avoid StackOverflowError
[ https://issues.apache.org/jira/browse/SPARK-5484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15495131#comment-15495131 ] ding commented on SPARK-5484: - I will work on the issue if nobody took it. > Pregel should checkpoint periodically to avoid StackOverflowError > - > > Key: SPARK-5484 > URL: https://issues.apache.org/jira/browse/SPARK-5484 > Project: Spark > Issue Type: Bug > Components: GraphX >Reporter: Ankur Dave >Assignee: Ankur Dave > > Pregel-based iterative algorithms with more than ~50 iterations begin to slow > down and eventually fail with a StackOverflowError due to Spark's lack of > support for long lineage chains. Instead, Pregel should checkpoint the graph > periodically. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17097) Pregel does not keep vertex state properly; fails to terminate
[ https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488609#comment-15488609 ] ding commented on SPARK-17097: -- I am afraid the attached sample code fail to terminate with case class is not caused by Pregel or graphx bug. As in the sample code, vertices(inLabels here) is updated inplace which supposed not to happen. In this way, after transform operations, the original vertices is also updated and it has exactly the same value with the new generated vertices in above code when VD is case class. It leads to fail to update EdgeTriplet as there is no difference of the vertices. So EdgeTriplet dstLabels is always empty while srcLabels contains a value. And there is always active message which lead to Pregel not terminate. One way to fix the problem is remove inplace update in vertexProgram by clone the labels and make update in the new labels. I have tried below code and it works. def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: Vector[String]): TestVertex = { val labels = vdata.labels.clone() message.foreach { case `startString` => if (vdata.id == 0L) labels.add(vdata.value ) case m => if (!vdata.labels.contains(m)) labels.add(m) } new TestVertex(vdata.id, vdata.value, labels) } Hope this information is helpful to you. > Pregel does not keep vertex state properly; fails to terminate > --- > > Key: SPARK-17097 > URL: https://issues.apache.org/jira/browse/SPARK-17097 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 1.6.0 > Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel >Reporter: Seth Bromberger > > Consider the following minimum example: > {code:title=PregelBug.scala|borderStyle=solid} > package testGraph > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _} > object PregelBug { > def main(args: Array[String]) = { > //FIXME breaks if TestVertex is a case class; works if not case class > case class TestVertex(inId: VertexId, > inData: String, > inLabels: collection.mutable.HashSet[String]) extends > Serializable { > val id = inId > val value = inData > val labels = inLabels > } > class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends > Serializable { > val src = inSrc > val dst = inDst > val data = inData > } > val startString = "XXXSTARTXXX" > val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]") > val sc = new SparkContext(conf) > val vertexes = Vector( > new TestVertex(0, "label0", collection.mutable.HashSet[String]()), > new TestVertex(1, "label1", collection.mutable.HashSet[String]()) > ) > val links = Vector( > new TestLink(0, 1, "linkData01") > ) > val vertexes_packaged = vertexes.map(v => (v.id, v)) > val links_packaged = links.map(e => Edge(e.src, e.dst, e)) > val graph = Graph[TestVertex, > TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged)) > def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: > Vector[String]): TestVertex = { > message.foreach { > case `startString` => > if (vdata.id == 0L) > vdata.labels.add(vdata.value) > case m => > if (!vdata.labels.contains(m)) > vdata.labels.add(m) > } > new TestVertex(vdata.id, vdata.value, vdata.labels) > } > def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): > Iterator[(VertexId, Vector[String])] = { > val srcLabels = triplet.srcAttr.labels > val dstLabels = triplet.dstAttr.labels > val msgsSrcDst = srcLabels.diff(dstLabels) > .map(label => (triplet.dstAttr.id, Vector[String](label))) > val msgsDstSrc = dstLabels.diff(dstLabels) > .map(label => (triplet.srcAttr.id, Vector[String](label))) > msgsSrcDst.toIterator ++ msgsDstSrc.toIterator > } > def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] > = m1.union(m2).distinct > val g = graph.pregel(Vector[String](startString))(vertexProgram, > sendMessage, mergeMessage) > println("---pregel done---") > println("vertex info:") > g.vertices.foreach( > v => { > val labels = v._2.labels > println( > "vertex " + v._1 + > ": name = " + v._2.id + > ", labels = " + labels) > } > ) > } > } > {code} > This code never terminates even though we expect it to. To fix, we simply > remove the "case" designation for the TestVertex class (see FIXME comment),
[jira] [Commented] (SPARK-16071) Not sufficient array size checks to avoid integer overflows in Tungsten
[ https://issues.apache.org/jira/browse/SPARK-16071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341354#comment-15341354 ] ding commented on SPARK-16071: -- OK, I see. Thank you for your clarification. > Not sufficient array size checks to avoid integer overflows in Tungsten > --- > > Key: SPARK-16071 > URL: https://issues.apache.org/jira/browse/SPARK-16071 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Xiangrui Meng >Priority: Critical > > Several bugs have been found caused by integer overflows in Tungsten. This > JIRA is for taking a final pass before 2.0 release to reduce potential bugs > and issues. We should do at least the following: > * Raise exception early instead of later throwing NegativeArraySize (which is > slow and might cause silent errors) > * Document clearly the largest array size we support in DataFrames. > To reproduce one of the issues: > {code} > val n = 1e8.toInt // try 2e8, 3e8 > sc.parallelize(0 until 1, 1).map(i => new > Array[Int](n)).toDS.map(_.size).show() > {code} > Result: > * n=1e8: correct but slow (see SPARK-16043) > * n=2e8: NegativeArraySize exception > {code:none} > java.lang.NegativeArraySizeException > at > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:61) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.RDDScanExec$$anonfun$doExecute$1$$anonfun$apply$3.apply(ExistingRDD.scala:123) > at > org.apache.spark.sql.execution.RDDScanExec$$anonfun$doExecute$1$$anonfun$apply$3.apply(ExistingRDD.scala:121) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > * n=3e8: NegativeArraySize exception but raised at a different location > {code:none} > java.lang.RuntimeException: Error while encoding: > java.lang.NegativeArraySizeException > newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS > value#108 > +- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) >+- input[0, [I, true] > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:257) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:430) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:430) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780) > at >
[jira] [Commented] (SPARK-16071) Not sufficient array size checks to avoid integer overflows in Tungsten
[ https://issues.apache.org/jira/browse/SPARK-16071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341318#comment-15341318 ] ding commented on SPARK-16071: -- The exception raised in different location as one happened in encoder(n = 3e8) and the other happened in executing logical plan(n = 2e8) when createDataset. And both of exceptions are thrown from grow function in BufferHolder. I think we can add array size check here. I will send a PR later. > Not sufficient array size checks to avoid integer overflows in Tungsten > --- > > Key: SPARK-16071 > URL: https://issues.apache.org/jira/browse/SPARK-16071 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Xiangrui Meng >Priority: Critical > > Several bugs have been found caused by integer overflows in Tungsten. This > JIRA is for taking a final pass before 2.0 release to reduce potential bugs > and issues. We should do at least the following: > * Raise exception early instead of later throwing NegativeArraySize (which is > slow and might cause silent errors) > * Document clearly the largest array size we support in DataFrames. > To reproduce one of the issues: > {code} > val n = 1e8.toInt // try 2e8, 3e8 > sc.parallelize(0 until 1, 1).map(i => new > Array[Int](n)).toDS.map(_.size).show() > {code} > Result: > * n=1e8: correct but slow (see SPARK-16043) > * n=2e8: NegativeArraySize exception > {code:none} > java.lang.NegativeArraySizeException > at > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:61) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.RDDScanExec$$anonfun$doExecute$1$$anonfun$apply$3.apply(ExistingRDD.scala:123) > at > org.apache.spark.sql.execution.RDDScanExec$$anonfun$doExecute$1$$anonfun$apply$3.apply(ExistingRDD.scala:121) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > * n=3e8: NegativeArraySize exception but raised at a different location > {code:none} > java.lang.RuntimeException: Error while encoding: > java.lang.NegativeArraySizeException > newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS > value#108 > +- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) >+- input[0, [I, true] > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:257) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:430) > at > org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:430) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at >
[jira] [Commented] (SPARK-15562) Temp directory is not deleted after program exit in DataFrameExample
[ https://issues.apache.org/jira/browse/SPARK-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303326#comment-15303326 ] ding commented on SPARK-15562: -- OK, I will check if there is similar JIRAs before creating a new one next time. > Temp directory is not deleted after program exit in DataFrameExample > > > Key: SPARK-15562 > URL: https://issues.apache.org/jira/browse/SPARK-15562 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 1.6.0 >Reporter: ding >Priority: Minor > > Temp directory used to save records is not deleted after program exit in > DataFrameExample. Although it called deleteOnExit, it doesn't work as the > directory is not empty. Similar things happend in ContextCleanerSuite -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15562) Temp directory is not deleted after program exit in DataFrameExample
ding created SPARK-15562: Summary: Temp directory is not deleted after program exit in DataFrameExample Key: SPARK-15562 URL: https://issues.apache.org/jira/browse/SPARK-15562 Project: Spark Issue Type: Bug Components: ML Affects Versions: 1.6.0 Reporter: ding Priority: Minor Temp directory used to save records is not deleted after program exit in DataFrameExample. Although it called deleteOnExit, it doesn't work as the directory is not empty. Similar things happend in ContextCleanerSuite -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15172) Warning message should explicitly tell user initial coefficients is ignored if its size doesn't match expected size in LogisticRegression
ding created SPARK-15172: Summary: Warning message should explicitly tell user initial coefficients is ignored if its size doesn't match expected size in LogisticRegression Key: SPARK-15172 URL: https://issues.apache.org/jira/browse/SPARK-15172 Project: Spark Issue Type: Improvement Components: ML Reporter: ding Priority: Trivial >From ML/LogisticRegression code logic, if size of initial coefficients doesn't >match expected size, initial coefficients value will be ignored. We should >explicitly tell user the information. Besides, log size of initial >coefficients should be more straightforward than log initial coefficients >value when size mismatch happened. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-14969) Remove unnecessary compute function in LogisticGradient
[ https://issues.apache.org/jira/browse/SPARK-14969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ding reopened SPARK-14969: -- > Remove unnecessary compute function in LogisticGradient > --- > > Key: SPARK-14969 > URL: https://issues.apache.org/jira/browse/SPARK-14969 > Project: Spark > Issue Type: Improvement > Components: MLlib >Reporter: ding >Priority: Trivial > > compute function has been implemented in base class Gradient. There is no > necessary to have a same implementation of compute in derived class > LogisticGradient. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-14969) Remove unnecessary compute function in LogisticGradient
[ https://issues.apache.org/jira/browse/SPARK-14969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ding resolved SPARK-14969. -- Resolution: Fixed > Remove unnecessary compute function in LogisticGradient > --- > > Key: SPARK-14969 > URL: https://issues.apache.org/jira/browse/SPARK-14969 > Project: Spark > Issue Type: Improvement > Components: MLlib >Reporter: ding >Priority: Trivial > > compute function has been implemented in base class Gradient. There is no > necessary to have a same implementation of compute in derived class > LogisticGradient. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14969) Remove unnecessary compute function in LogisticGradient
ding created SPARK-14969: Summary: Remove unnecessary compute function in LogisticGradient Key: SPARK-14969 URL: https://issues.apache.org/jira/browse/SPARK-14969 Project: Spark Issue Type: Improvement Components: MLlib Reporter: ding Priority: Trivial compute function has been implemented in base class Gradient. There is no necessary to have a same implementation of compute in derived class LogisticGradient. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14889) scala.MatchError: NONE (of class scala.Enumeration$Val) when spark.scheduler.mode=NONE
[ https://issues.apache.org/jira/browse/SPARK-14889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15256061#comment-15256061 ] ding commented on SPARK-14889: -- @Jacek Laskowski, do you mind if I take the jira and give a fix to the issue? > scala.MatchError: NONE (of class scala.Enumeration$Val) when > spark.scheduler.mode=NONE > -- > > Key: SPARK-14889 > URL: https://issues.apache.org/jira/browse/SPARK-14889 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: Jacek Laskowski >Priority: Minor > > When {{TaskSchedulerImpl}} is initialized it pattern matches on acceptable > scheduling modes - {{FIFO}} and {{FAIR}} modes - but misses {{NONE}}. > It should at least pattern match the case and throw a more meaningful > exception. > {code} > ➜ spark git:(master) ✗ ./bin/spark-shell -c spark.scheduler.mode=NONE > 16/04/25 09:15:00 ERROR SparkContext: Error initializing SparkContext. > scala.MatchError: NONE (of class scala.Enumeration$Val) > at org.apache.spark.scheduler.Pool.(Pool.scala:53) > at > org.apache.spark.scheduler.TaskSchedulerImpl.initialize(TaskSchedulerImpl.scala:131) > at > org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2352) > at org.apache.spark.SparkContext.(SparkContext.scala:492) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13378) Add tee method to RDD
Richard Ding created SPARK-13378: Summary: Add tee method to RDD Key: SPARK-13378 URL: https://issues.apache.org/jira/browse/SPARK-13378 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.6.0 Reporter: Richard Ding In our application, we sometimes need to save the partial/intermediate results to side files in the middle of a data pipeline/DAG. The only way now to do this is to use saveAsTextFile method which only runs at the end of a pipeline. Otherwise multiple jobs are needed. We’ve implemented ‘tee’ method on RDD that is similar to Unix tee utility. Below are the proposed methods: {code} def tee(path: String) : RDD[T] {code} Return a new RDD that is the same as this RDD but also save a copy of this RDD to a text file, using string representation of elements. {code} def tee(path: String, f: (T) => Boolean): RDD[T] {code} Return a new RDD that is the same as this RDD but also save to a text file a copy of the elements in this RDD that satisfy a predicate , using string representation of elements. These methods can be used in RDD pipelines in ways similar to the tee utility in Unix command pipeline, for example, {code} sc.textFile(dataFile).map(x => x.split(“\t”) .map(x => (x(0), x(1).toInt, x(2)) .tee(“output/tee-data-1”) .tee(“output/tee-data-2”, x=> x._2 >= 10) .groupBy(x => x._1) .saveAsTextFile(“output/out-data”) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5556) Latent Dirichlet Allocation (LDA) using Gibbs sampler
[ https://issues.apache.org/jira/browse/SPARK-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738090#comment-14738090 ] ding commented on SPARK-5556: - We have made the spark package and it can be find here http://spark-packages.org/package/intel-analytics/TopicModeling > Latent Dirichlet Allocation (LDA) using Gibbs sampler > -- > > Key: SPARK-5556 > URL: https://issues.apache.org/jira/browse/SPARK-5556 > Project: Spark > Issue Type: New Feature > Components: MLlib >Reporter: Guoqiang Li >Assignee: Pedro Rodriguez > Attachments: LDA_test.xlsx, spark-summit.pptx > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5556) Latent Dirichlet Allocation (LDA) using Gibbs sampler
[ https://issues.apache.org/jira/browse/SPARK-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696677#comment-14696677 ] ding commented on SPARK-5556: - The code can be found https://github.com/intel-analytics/TopicModeling. There is an example in the package, you can try gibbs sampling lda or online lda by setting --optimizer as gibbs or online Latent Dirichlet Allocation (LDA) using Gibbs sampler -- Key: SPARK-5556 URL: https://issues.apache.org/jira/browse/SPARK-5556 Project: Spark Issue Type: New Feature Components: MLlib Reporter: Guoqiang Li Assignee: Pedro Rodriguez Attachments: LDA_test.xlsx, spark-summit.pptx -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5418) Output directory for shuffle should consider left space of each directory set in conf
ding created SPARK-5418: --- Summary: Output directory for shuffle should consider left space of each directory set in conf Key: SPARK-5418 URL: https://issues.apache.org/jira/browse/SPARK-5418 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Environment: Ubuntu, others should be similar Reporter: ding Priority: Minor I set multiple directorys in conf spark.local.dir as scratch space, one of them(eg. /mnt/disk1) have 30G left space while others(eg./mnt/disk2) have 100G. In current version, spark use hash to figure out which directory is used for scratch space. It means each directory has the same chance. After hounds of iteration of pagerank, there is No space left exception and driver crashes. It does not make sense since there is still 70G+ left space in other directorys. We should take consider left space on each directorys when figure out which directory should be map output dir. I will send a PR for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4105) FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290961#comment-14290961 ] ding commented on SPARK-4105: - I hit this error when using pagerank(It cannot be consistent repro as I only hit once). I am not using the KryoSerializer but I am using the default serializer. The Spark code is get from chunk at 2015/1/19 which should be later than spark 1.2.0. 15/01/23 23:32:57 WARN scheduler.TaskSetManager: Lost task 347.0 in stage 9461.0 (TID 302687, sr213): FetchFailed(BlockManagerId(13, sr207, 49805), shuffleId=399, mapId=461, reduceId=347, message= org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.graphx.impl.VertexPartitionBaseOps.aggregateUsingIndex(VertexPartitionBaseOps.scala:207) at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$5$$anonfun$apply$4.apply(VertexRDDImpl.scala:171) at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$5$$anonfun$apply$4.apply(VertexRDDImpl.scala:171) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:113) at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:111) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:65) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:143) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:299) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:299) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle - Key: SPARK-4105 URL: