[jira] [Commented] (SPARK-36240) Graceful termination of Spark Structured Streaming queries

2021-11-26 Thread yufei ding (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17449458#comment-17449458
 ] 

yufei ding commented on SPARK-36240:


This feature is expected to be available in which version of Spark?

> Graceful termination of Spark Structured Streaming queries
> --
>
> Key: SPARK-36240
> URL: https://issues.apache.org/jira/browse/SPARK-36240
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.1.2
> Environment:  
>  
>Reporter: Zoltán Zvara
>Priority: Major
>
> Spark Streaming provides a way to gracefully stop the streaming application 
> using the configuration parameter 
> {{spark.streaming.stopGracefullyOnShutdown}}. The configuration states:
> {quote}If {{true}}, Spark shuts down the {{StreamingContext}} gracefully on 
> JVM shutdown rather than immediately.
> {quote}
> This effectively stops the job generation (see {{JobGenerator}} of Spark 
> Streaming) and lets the current {{Job}} (corresponding to a micro-batch) be 
> finished instead of canceling the active job itself.
> Some applications may require graceful stopping so that their output would 
> remain consistent - an output that is written out halfway poses a lot of 
> problems for applications that would require "exactly-once" semantics.
> There is no support in Structured Streaming to gracefully stop 
> queries/streaming applications.
> Naive solutions found on the web propose checking whether the queries are 
> active using {{query.isActive}} or checking query state directly and then 
> attempting to call {{stop()}} at the right time. In Structured Streaming, 
> with the current implementation, {{stop()}} cancels the job group that may 
> lead to inconsistent output, because it still depends on the timing of the 
> cancellation.
> _Proposed solution:_
> Strictly speaking in the context of the micro-batch execution model, a 
> {{StreamingQuery}} that we want to gracefully stop would be of implementation 
> \{{MicroBatchExecution. }}The motivation is similar to that of the Streaming 
> Context's gracefulness: stop the "job generation" and then wait for any 
> active job to finish, instead of canceling the jobs.
> The micro-batch scheduling is managed by a {{ProcessingTimeExecutor}} of the 
> {{MicroBatchExecution}} class.
>  
> {code:java}
> private val triggerExecutor = trigger match {
>   case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
>   case OneTimeTrigger => OneTimeExecutor()
>   case _ => throw new IllegalStateException(s"Unknown type of trigger: 
> $trigger")
> }
> {code}
> The following while-true is being run be the job generation mechanism. The 
> {{triggerHandler}} is a UDF that generates the micro-batches.
> {code:java}
> override def execute(triggerHandler: () => Boolean): Unit = {
>   while (true) {
> val triggerTimeMs = clock.getTimeMillis
> val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
> val terminated = !triggerHandler()
> if (intervalMs > 0) {
>   val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
>   if (batchElapsedTimeMs > intervalMs) {
> notifyBatchFallingBehind(batchElapsedTimeMs)
>   }
>   if (terminated) {
> return
>   }
>   clock.waitTillTime(nextTriggerTimeMs)
> } else {
>   if (terminated) {
> return
>   }
> }
>   }
> }
> {code}
> Here, upon a {{gracefulStop()}} signal from the queries could essentially 
> signal {{ProcessingTimeExecutor}} to stop triggering new batches.
> Then another mechanism is required that would await until any current job is 
> finished. Then, it would call {{stop()}} and then the {{SparkSession}} may be 
> stopped as well.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path

2021-05-03 Thread Yusheng Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yusheng Ding updated SPARK-35299:
-
Description: 
To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
 s3.put_object(
     Bucket="test_bucket", Body="", Key=f"test_table/"
 )

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

#Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 

  was:
To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
 s3.put_object(
     Bucket="test_bucket", Body="", Key=f"test_table/"
 )

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

## Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 


> Dataframe overwrite on S3 does not delete old files with S3 object-put to 
> table path
> 
>
> Key: SPARK-35299
> URL: https://issues.apache.org/jira/browse/SPARK-35299
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yusheng Ding
>Priority: Major
>  Labels: aws-s3, dataframe, hive, spark
>
> To reproduce:
> test_table path: s3a://test_bucket/test_table/
>  
> df = spark_session.sql("SELECT * FROM test_table")
> df.count()  # produce row number 1000
> #S3 operation##
> s3 = boto3.client("s3")
>  s3.put_object(
>      Bucket="test_bucket", Body="", Key=f"test_table/"
>  )
> #S3 operation##
> df.write.insertInto(test_table, overwrite=True)
> #Same goes to df.write.save(mode="overwrite", format="parquet", 
> path="s3a://test_bucket/test_table")
> df = spark_session.sql("SELECT * FROM test_table")
> df.count()  # produce row number 2000
>  
> Overwrite is not functioning correctly. Old files will not be deleted on S3.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path

2021-05-03 Thread Yusheng Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yusheng Ding updated SPARK-35299:
-
Description: 
To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
 s3.put_object(
     Bucket="test_bucket", Body="", Key=f"test_table/"
 )

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

## Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 

  was:
To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
 s3.put_object(
     Bucket="test_bucket", Body="", Key=f"test_table/"
 )

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

# Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 


> Dataframe overwrite on S3 does not delete old files with S3 object-put to 
> table path
> 
>
> Key: SPARK-35299
> URL: https://issues.apache.org/jira/browse/SPARK-35299
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yusheng Ding
>Priority: Major
>  Labels: aws-s3, dataframe, hive, spark
>
> To reproduce:
> test_table path: s3a://test_bucket/test_table/
>  
> df = spark_session.sql("SELECT * FROM test_table")
> df.count()  # produce row number 1000
> #S3 operation##
> s3 = boto3.client("s3")
>  s3.put_object(
>      Bucket="test_bucket", Body="", Key=f"test_table/"
>  )
> #S3 operation##
> df.write.insertInto(test_table, overwrite=True)
> ## Same goes to df.write.save(mode="overwrite", format="parquet", 
> path="s3a://test_bucket/test_table")
> df = spark_session.sql("SELECT * FROM test_table")
> df.count()  # produce row number 2000
>  
> Overwrite is not functioning correctly. Old files will not be deleted on S3.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path

2021-05-03 Thread Yusheng Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yusheng Ding updated SPARK-35299:
-
Description: 
To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
 s3.put_object(
     Bucket="test_bucket", Body="", Key=f"test_table/"
 )

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

## Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 

  was:
To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
 s3.put_object(
     Bucket="test_bucket", Body="", Key=f"test_table/"
 )

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

## Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 


> Dataframe overwrite on S3 does not delete old files with S3 object-put to 
> table path
> 
>
> Key: SPARK-35299
> URL: https://issues.apache.org/jira/browse/SPARK-35299
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yusheng Ding
>Priority: Major
>  Labels: aws-s3, dataframe, hive, spark
>
> To reproduce:
> test_table path: s3a://test_bucket/test_table/
>  
> df = spark_session.sql("SELECT * FROM test_table")
> df.count()  # produce row number 1000
> #S3 operation##
> s3 = boto3.client("s3")
>  s3.put_object(
>      Bucket="test_bucket", Body="", Key=f"test_table/"
>  )
> #S3 operation##
> df.write.insertInto(test_table, overwrite=True)
> ## Same goes to df.write.save(mode="overwrite", format="parquet", 
> path="s3a://test_bucket/test_table")
> df = spark_session.sql("SELECT * FROM test_table")
> df.count()  # produce row number 2000
>  
> Overwrite is not functioning correctly. Old files will not be deleted on S3.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path

2021-05-03 Thread Yusheng Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yusheng Ding updated SPARK-35299:
-
Description: 
To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
 s3.put_object(
     Bucket="test_bucket", Body="", Key=f"test_table/"
 )

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

# Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 

  was:
To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
s3.put_object(
    Bucket="test_bucket", Body="", Key=f"test_table/"
)

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

# Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 


> Dataframe overwrite on S3 does not delete old files with S3 object-put to 
> table path
> 
>
> Key: SPARK-35299
> URL: https://issues.apache.org/jira/browse/SPARK-35299
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yusheng Ding
>Priority: Major
>  Labels: aws-s3, dataframe, hive, spark
>
> To reproduce:
> test_table path: s3a://test_bucket/test_table/
>  
> df = spark_session.sql("SELECT * FROM test_table")
> df.count()  # produce row number 1000
> #S3 operation##
> s3 = boto3.client("s3")
>  s3.put_object(
>      Bucket="test_bucket", Body="", Key=f"test_table/"
>  )
> #S3 operation##
> df.write.insertInto(test_table, overwrite=True)
> # Same goes to df.write.save(mode="overwrite", format="parquet", 
> path="s3a://test_bucket/test_table")
> df = spark_session.sql("SELECT * FROM test_table")
> df.count()  # produce row number 2000
>  
> Overwrite is not functioning correctly. Old files will not be deleted on S3.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35299) Dataframe overwrite on S3 does not delete old files with S3 object-put to table path

2021-05-03 Thread Yusheng Ding (Jira)
Yusheng Ding created SPARK-35299:


 Summary: Dataframe overwrite on S3 does not delete old files with 
S3 object-put to table path
 Key: SPARK-35299
 URL: https://issues.apache.org/jira/browse/SPARK-35299
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Yusheng Ding


To reproduce:

test_table path: s3a://test_bucket/test_table/

 

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 1000

#S3 operation##

s3 = boto3.client("s3")
s3.put_object(
    Bucket="test_bucket", Body="", Key=f"test_table/"
)

#S3 operation##

df.write.insertInto(test_table, overwrite=True)

# Same goes to df.write.save(mode="overwrite", format="parquet", 
path="s3a://test_bucket/test_table")

df = spark_session.sql("SELECT * FROM test_table")

df.count()  # produce row number 2000

 

Overwrite is not functioning correctly. Old files will not be deleted on S3.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31933) Found another deadlock in Spark Driver

2020-06-08 Thread Wenning Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenning Ding updated SPARK-31933:
-
Description: 
This is another Spark java-level deadlock we found: it looks similar to 
https://issues.apache.org/jira/browse/SPARK-26961, but I think actually they 
are different issues.

 Looks like this deadlock is caused from FsUrlStreamHandlerFactory.

One straightforward way to fix this is to change 
FsUrlStreamHandlerFactory.java#L74 from 
{{FileSystem.getFileSystemClass(protocol, conf);}} to 
{{FileSystem.getFileSystemClass(protocol, new Configuration(conf));}}

But not sure if this is acceptable from Hadoop side. Not sure if there's better 
way to solve this from Spark side.

 
{code:java}
"SparkUI-60":  waiting to lock monitor 0x7f511ca22728 (object 
0x00068c6e9060, a org.apache.hadoop.conf.Configuration),  which is held by 
"Driver"

"Driver":  waiting to lock monitor 0x7f511c4fe448 (object 
0x000400079600, a java.util.HashMap),  which is held by "SparkUI-60"
{code}
 
{code:java}
"SparkUI-60":
at org.apache.hadoop.conf.Configuration.getOverlay(Configuration.java:1328) 
- waiting to lock <0x00068c6e9060> (a org.apache.hadoop.conf.Configuration) 
at 
org.apache.hadoop.conf.Configuration.handleDeprecation(Configuration.java:684) 
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1088) 
at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1145) 
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2363) 
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840) 
at 
org.apache.hadoop.fs.FsUrlStreamHandlerFactory.createURLStreamHandler(FsUrlStreamHandlerFactory.java:74)
 
at java.net.URL.getURLStreamHandler(URL.java:1165) 
at java.net.URL.(URL.java:422) 
at java.net.URL.(URL.java:312) 
at java.net.URL.(URL.java:335) 
at sun.net.www.ParseUtil.fileToEncodedURL(ParseUtil.java:272) 
at java.lang.Package$1.run(Package.java:579) 
at java.lang.Package$1.run(Package.java:570) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.lang.Package.defineSystemPackage(Package.java:570) 
at java.lang.Package.getSystemPackage(Package.java:546) 
- locked <0x000400079600> (a java.util.HashMap) 
at java.lang.ClassLoader.getPackage(ClassLoader.java:1630) 
at java.lang.ClassLoader.getPackage(ClassLoader.java:1628) 
at java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:394) 
at java.net.URLClassLoader.definePackageInternal(URLClassLoader.java:420) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:452) 
at java.net.URLClassLoader.access$100(URLClassLoader.java:74) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:369) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:363) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:362) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) 
- locked <0x00068dd20910> (a java.lang.Object) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) 
at 
org.glassfish.jersey.model.internal.ComponentBag.modelFor(ComponentBag.java:483)
 
at 
org.glassfish.jersey.model.internal.ComponentBag.access$100(ComponentBag.java:89)
 
at 
org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:408) 
at 
org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:398) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:315) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:297) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:228) 
at 
org.glassfish.jersey.model.internal.ComponentBag.registerModel(ComponentBag.java:398)
 at 
org.glassfish.jersey.model.internal.ComponentBag.register(ComponentBag.java:235)
 
at 
org.glassfish.jersey.model.internal.CommonConfig.register(CommonConfig.java:420)
 
at org.glassfish.jersey.server.ResourceConfig.register(ResourceConfig.java:425) 
at 
org.glassfish.jersey.server.ResourceConfig.registerClasses(ResourceConfig.java:501)
 
at 
org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1212)
 
at 
org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1178)
 
at 
org.glassfish.jersey.server.ResourceConfig.createRuntimeConfig(ResourceConfig.java:1174)
 at 
org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:345)
 
at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392) 
at 
org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177) 
at 
org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369) 
at javax.servlet.GenericServlet.init(GenericServlet.java:244) 
at 
org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
 
at 

[jira] [Updated] (SPARK-31933) Found another deadlock in Spark Driver

2020-06-08 Thread Wenning Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenning Ding updated SPARK-31933:
-
Description: 
This is another Spark java-level deadlock we found: it looks similar to 
https://issues.apache.org/jira/browse/SPARK-26961, but I think actually they 
are different issues.

 
{code:java}
"SparkUI-60":  waiting to lock monitor 0x7f511ca22728 (object 
0x00068c6e9060, a org.apache.hadoop.conf.Configuration),  which is held by 
"Driver"

"Driver":  waiting to lock monitor 0x7f511c4fe448 (object 
0x000400079600, a java.util.HashMap),  which is held by "SparkUI-60"
{code}
 
{code:java}
"SparkUI-60":
at org.apache.hadoop.conf.Configuration.getOverlay(Configuration.java:1328) 
- waiting to lock <0x00068c6e9060> (a org.apache.hadoop.conf.Configuration) 
at 
org.apache.hadoop.conf.Configuration.handleDeprecation(Configuration.java:684) 
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1088) 
at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1145) 
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2363) 
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840) 
at 
org.apache.hadoop.fs.FsUrlStreamHandlerFactory.createURLStreamHandler(FsUrlStreamHandlerFactory.java:74)
 
at java.net.URL.getURLStreamHandler(URL.java:1165) 
at java.net.URL.(URL.java:422) 
at java.net.URL.(URL.java:312) 
at java.net.URL.(URL.java:335) 
at sun.net.www.ParseUtil.fileToEncodedURL(ParseUtil.java:272) 
at java.lang.Package$1.run(Package.java:579) 
at java.lang.Package$1.run(Package.java:570) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.lang.Package.defineSystemPackage(Package.java:570) 
at java.lang.Package.getSystemPackage(Package.java:546) 
- locked <0x000400079600> (a java.util.HashMap) 
at java.lang.ClassLoader.getPackage(ClassLoader.java:1630) 
at java.lang.ClassLoader.getPackage(ClassLoader.java:1628) 
at java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:394) 
at java.net.URLClassLoader.definePackageInternal(URLClassLoader.java:420) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:452) 
at java.net.URLClassLoader.access$100(URLClassLoader.java:74) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:369) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:363) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:362) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) 
- locked <0x00068dd20910> (a java.lang.Object) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) 
at 
org.glassfish.jersey.model.internal.ComponentBag.modelFor(ComponentBag.java:483)
 
at 
org.glassfish.jersey.model.internal.ComponentBag.access$100(ComponentBag.java:89)
 
at 
org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:408) 
at 
org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:398) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:315) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:297) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:228) 
at 
org.glassfish.jersey.model.internal.ComponentBag.registerModel(ComponentBag.java:398)
 at 
org.glassfish.jersey.model.internal.ComponentBag.register(ComponentBag.java:235)
 
at 
org.glassfish.jersey.model.internal.CommonConfig.register(CommonConfig.java:420)
 
at org.glassfish.jersey.server.ResourceConfig.register(ResourceConfig.java:425) 
at 
org.glassfish.jersey.server.ResourceConfig.registerClasses(ResourceConfig.java:501)
 
at 
org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1212)
 
at 
org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1178)
 
at 
org.glassfish.jersey.server.ResourceConfig.createRuntimeConfig(ResourceConfig.java:1174)
 at 
org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:345)
 
at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392) 
at 
org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177) 
at 
org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369) 
at javax.servlet.GenericServlet.init(GenericServlet.java:244) 
at 
org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
 
at 
org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499)
 
- locked <0x000407a5a840> (a org.spark_project.jetty.servlet.ServletHolder) 
at 
org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791)
 
- locked <0x000407a5a840> (a org.spark_project.jetty.servlet.ServletHolder) 
at 
org.spark_project.jetty.servlet.ServletHolder.prepare(ServletHolder.java:776) 
at 

[jira] [Updated] (SPARK-31933) Found another deadlock in Spark Driver

2020-06-08 Thread Wenning Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenning Ding updated SPARK-31933:
-
Description: 
This is another Spark java-level deadlock we found: it looks similar to 
https://issues.apache.org/jira/browse/SPARK-26961, but I think actually they 
are different issues.

 
{code:java}
"SparkUI-60":  waiting to lock monitor 0x7f511ca22728 (object 
0x00068c6e9060, a org.apache.hadoop.conf.Configuration),  which is held by 
"Driver"

"Driver":  waiting to lock monitor 0x7f511c4fe448 (object 
0x000400079600, a java.util.HashMap),  which is held by "SparkUI-60"
{code}
 
{code:java}
"SparkUI-60":"SparkUI-60": 
at org.apache.hadoop.conf.Configuration.getOverlay(Configuration.java:1328) - 
waiting to lock <0x00068c6e9060> (a org.apache.hadoop.conf.Configuration) 
at 
org.apache.hadoop.conf.Configuration.handleDeprecation(Configuration.java:684) 
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1088) 
at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1145) 
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2363) 
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840) 
at 
org.apache.hadoop.fs.FsUrlStreamHandlerFactory.createURLStreamHandler(FsUrlStreamHandlerFactory.java:74)
 at java.net.URL.getURLStreamHandler(URL.java:1165) 
at java.net.URL.(URL.java:422) 
at java.net.URL.(URL.java:312) 
at java.net.URL.(URL.java:335) 
at sun.net.www.ParseUtil.fileToEncodedURL(ParseUtil.java:272) 
at java.lang.Package$1.run(Package.java:579) at 
java.lang.Package$1.run(Package.java:570) at 
java.security.AccessController.doPrivileged(Native Method) 
at java.lang.Package.defineSystemPackage(Package.java:570) 
at java.lang.Package.getSystemPackage(Package.java:546) - locked 
<0x000400079600> (a java.util.HashMap) 
at java.lang.ClassLoader.getPackage(ClassLoader.java:1630) 
at java.lang.ClassLoader.getPackage(ClassLoader.java:1628) 
at java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:394) 
at java.net.URLClassLoader.definePackageInternal(URLClassLoader.java:420) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:452) 
at java.net.URLClassLoader.access$100(URLClassLoader.java:74) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:369) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:363) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:362) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) - locked 
<0x00068dd20910> (a java.lang.Object) at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) 
at 
org.glassfish.jersey.model.internal.ComponentBag.modelFor(ComponentBag.java:483)
 
at 
org.glassfish.jersey.model.internal.ComponentBag.access$100(ComponentBag.java:89)
 
at 
org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:408) 
at 
org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:398) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:315) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:297) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:228) 
at 
org.glassfish.jersey.model.internal.ComponentBag.registerModel(ComponentBag.java:398)
 at 
org.glassfish.jersey.model.internal.ComponentBag.register(ComponentBag.java:235)
 
at 
org.glassfish.jersey.model.internal.CommonConfig.register(CommonConfig.java:420)
 
at org.glassfish.jersey.server.ResourceConfig.register(ResourceConfig.java:425) 
at 
org.glassfish.jersey.server.ResourceConfig.registerClasses(ResourceConfig.java:501)
 
at 
org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1212)
 at 
org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1178)
 
at 
org.glassfish.jersey.server.ResourceConfig.createRuntimeConfig(ResourceConfig.java:1174)
 at 
org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:345)
 
at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392) 
at 
org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177) 
at 
org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369) 
at javax.servlet.GenericServlet.init(GenericServlet.java:244) 
at 
org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
 
at 
org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499)
 - locked <0x000407a5a840> (a 
org.spark_project.jetty.servlet.ServletHolder) 
at 
org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791)
 - locked <0x000407a5a840> (a 
org.spark_project.jetty.servlet.ServletHolder) 
at 
org.spark_project.jetty.servlet.ServletHolder.prepare(ServletHolder.java:776) 
at 

[jira] [Created] (SPARK-31933) Found another deadlock in Spark Driver

2020-06-08 Thread Wenning Ding (Jira)
Wenning Ding created SPARK-31933:


 Summary: Found another deadlock in Spark Driver
 Key: SPARK-31933
 URL: https://issues.apache.org/jira/browse/SPARK-31933
 Project: Spark
  Issue Type: Bug
  Components: Spark Submit
Affects Versions: 2.4.6, 3.1.0
Reporter: Wenning Ding


This is another Spark java-level deadlock we found: it looks similar to 
https://issues.apache.org/jira/browse/SPARK-26961, but I think actually they 
are different issues.

 
{code:java}
"SparkUI-60":  waiting to lock monitor 0x7f511ca22728 (object 
0x00068c6e9060, a org.apache.hadoop.conf.Configuration),  which is held by 
"Driver"

"Driver":  waiting to lock monitor 0x7f511c4fe448 (object 
0x000400079600, a java.util.HashMap),  which is held by "SparkUI-60"
{code}
{code:java}
 
"SparkUI-60": at 
org.apache.hadoop.conf.Configuration.getOverlay(Configuration.java:1328) - 
waiting to lock <0x00068c6e9060> (a org.apache.hadoop.conf.Configuration) 
at 
org.apache.hadoop.conf.Configuration.handleDeprecation(Configuration.java:684) 
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1088) at 
org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1145) at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2363) at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2840) at 
org.apache.hadoop.fs.FsUrlStreamHandlerFactory.createURLStreamHandler(FsUrlStreamHandlerFactory.java:74)
 at java.net.URL.getURLStreamHandler(URL.java:1165) at 
java.net.URL.(URL.java:422) at java.net.URL.(URL.java:312) at 
java.net.URL.(URL.java:335) at 
sun.net.www.ParseUtil.fileToEncodedURL(ParseUtil.java:272) at 
java.lang.Package$1.run(Package.java:579) at 
java.lang.Package$1.run(Package.java:570) at 
java.security.AccessController.doPrivileged(Native Method) at 
java.lang.Package.defineSystemPackage(Package.java:570) at 
java.lang.Package.getSystemPackage(Package.java:546) - locked 
<0x000400079600> (a java.util.HashMap) at 
java.lang.ClassLoader.getPackage(ClassLoader.java:1630) at 
java.lang.ClassLoader.getPackage(ClassLoader.java:1628) at 
java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:394) at 
java.net.URLClassLoader.definePackageInternal(URLClassLoader.java:420) at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:452) at 
java.net.URLClassLoader.access$100(URLClassLoader.java:74) at 
java.net.URLClassLoader$1.run(URLClassLoader.java:369) at 
java.net.URLClassLoader$1.run(URLClassLoader.java:363) at 
java.security.AccessController.doPrivileged(Native Method) at 
java.net.URLClassLoader.findClass(URLClassLoader.java:362) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:419) - locked 
<0x00068dd20910> (a java.lang.Object) at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:352) at 
org.glassfish.jersey.model.internal.ComponentBag.modelFor(ComponentBag.java:483)
 at 
org.glassfish.jersey.model.internal.ComponentBag.access$100(ComponentBag.java:89)
 at 
org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:408) 
at 
org.glassfish.jersey.model.internal.ComponentBag$5.call(ComponentBag.java:398) 
at org.glassfish.jersey.internal.Errors.process(Errors.java:315) at 
org.glassfish.jersey.internal.Errors.process(Errors.java:297) at 
org.glassfish.jersey.internal.Errors.process(Errors.java:228) at 
org.glassfish.jersey.model.internal.ComponentBag.registerModel(ComponentBag.java:398)
 at 
org.glassfish.jersey.model.internal.ComponentBag.register(ComponentBag.java:235)
 at 
org.glassfish.jersey.model.internal.CommonConfig.register(CommonConfig.java:420)
 at 
org.glassfish.jersey.server.ResourceConfig.register(ResourceConfig.java:425) at 
org.glassfish.jersey.server.ResourceConfig.registerClasses(ResourceConfig.java:501)
 at 
org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1212)
 at 
org.glassfish.jersey.server.ResourceConfig$RuntimeConfig.(ResourceConfig.java:1178)
 at 
org.glassfish.jersey.server.ResourceConfig.createRuntimeConfig(ResourceConfig.java:1174)
 at 
org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:345)
 at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392) at 
org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177) 
at 
org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369) 
at javax.servlet.GenericServlet.init(GenericServlet.java:244) at 
org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
 at 
org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499)
 - locked <0x000407a5a840> (a 
org.spark_project.jetty.servlet.ServletHolder) at 
org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791)
 - locked <0x000407a5a840> (a 

[jira] [Commented] (SPARK-29868) Add a benchmark for Adaptive Execution

2019-11-12 Thread leonard ding (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16972974#comment-16972974
 ] 

leonard ding commented on SPARK-29868:
--

I will pick it

> Add a benchmark for Adaptive Execution
> --
>
> Key: SPARK-29868
> URL: https://issues.apache.org/jira/browse/SPARK-29868
> Project: Spark
>  Issue Type: Test
>  Components: SQL, Tests
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
> Attachments: BroadcastJoin.jpeg, SortMergeJoin.jpeg
>
>
> Add a benchmark for Adaptive Execution to evaluate SortMergeJoin to 
> BroadcastJoin performance.
> It seem SortMergeJoin faster than BroadcastJoin if one side is a bucketed 
> table and can convert to BroadcastJoin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6

2016-10-15 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15578987#comment-15578987
 ] 

ding edited comment on SPARK-17951 at 10/16/16 12:22 AM:
-

I tried to call rdd.collect which internally called bm.getRemoteBytes in 
multiple threads. The data shows difference of time spend on block fetch is 
also around 0.1s between spark 1.5.1 and spark 1.6.2. And it can be 
consistently repro.

In our scenario, we called getRemoteBytes twice. If we use spark 1.6.2 it will 
bring additional 0.2s overhead and cause around 5 decrease of throughput. we 
would like to reduce the overhead if possible.


was (Author: ding):
I tried to call rdd.collect which internally called bm.getRemoteBytes in 
multiple threads. The data shows difference of time spend on block fetch is 
also around 0.1s between spark 1.5.1 and spark 1.6.2. And it can be 
consistently repro.

In our scenario, we called getRemoteBytes twice. If we use spark 1.6.2 it will 
bring additional 0.2s overhead and cause around 5 decrease of throughput. we 
would like to reduce the overhead as much as possible.

> BlockFetch with multiple threads slows down after spark 1.6
> ---
>
> Key: SPARK-17951
> URL: https://issues.apache.org/jira/browse/SPARK-17951
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 1.6.2
> Environment: cluster with 8 node, each node has 28 cores. 10Gb network
>Reporter: ding
>
> The following code demonstrates the issue:
> {code}
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName(s"BMTest")
> val size = 3344570
> val sc = new SparkContext(conf)
> val data = sc.parallelize(1 to 100, 8)
> var accum = sc.accumulator(0.0, "get remote bytes")
> var i = 0
> while(i < 91) {
>   accum = sc.accumulator(0.0, "get remote bytes")
>   val test = data.mapPartitionsWithIndex { (pid, iter) =>
> val N = size
> val bm = SparkEnv.get.blockManager
> val blockId = TaskResultBlockId(10*i + pid)
> val test = new Array[Byte](N)
> Random.nextBytes(test)
> val buffer = ByteBuffer.allocate(N)
> buffer.limit(N)
> buffer.put(test)
> bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
> Iterator(1)
>   }.count()
>   
>   data.mapPartitionsWithIndex { (pid, iter) =>
> val before = System.nanoTime()
> 
> val bm = SparkEnv.get.blockManager
> (0 to 7).map(s => {
>   Future {
> val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
>   }
> }).map(Await.result(_, Duration.Inf))
> 
> accum.add((System.nanoTime() - before) / 1e9)
> Iterator(1)
>   }.count()
>   println("get remote bytes take: " + accum.value/8)
>   i += 1
> }
>   }
> {code}
> In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
> in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s
> However if fetch block in single thread, the gap is much smaller.
> spark1.6.2  get remote bytes: 0.21 s
> spark1.5.1  get remote bytes: 0.20 s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6

2016-10-15 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15578987#comment-15578987
 ] 

ding commented on SPARK-17951:
--

I tried to call rdd.collect which internally called bm.getRemoteBytes in 
multiple threads. The data shows difference of time spend on block fetch is 
also around 0.1s between spark 1.5.1 and spark 1.6.2. And it can be 
consistently repro.

In our scenario, we called getRemoteBytes twice. If we use spark 1.6.2 it will 
bring additional 0.2s overhead and cause around 5 decrease of throughput. we 
would like to reduce the overhead as much as possible.

> BlockFetch with multiple threads slows down after spark 1.6
> ---
>
> Key: SPARK-17951
> URL: https://issues.apache.org/jira/browse/SPARK-17951
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 1.6.2
> Environment: cluster with 8 node, each node has 28 cores. 10Gb network
>Reporter: ding
>
> The following code demonstrates the issue:
> {code}
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName(s"BMTest")
> val size = 3344570
> val sc = new SparkContext(conf)
> val data = sc.parallelize(1 to 100, 8)
> var accum = sc.accumulator(0.0, "get remote bytes")
> var i = 0
> while(i < 91) {
>   accum = sc.accumulator(0.0, "get remote bytes")
>   val test = data.mapPartitionsWithIndex { (pid, iter) =>
> val N = size
> val bm = SparkEnv.get.blockManager
> val blockId = TaskResultBlockId(10*i + pid)
> val test = new Array[Byte](N)
> Random.nextBytes(test)
> val buffer = ByteBuffer.allocate(N)
> buffer.limit(N)
> buffer.put(test)
> bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
> Iterator(1)
>   }.count()
>   
>   data.mapPartitionsWithIndex { (pid, iter) =>
> val before = System.nanoTime()
> 
> val bm = SparkEnv.get.blockManager
> (0 to 7).map(s => {
>   Future {
> val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
>   }
> }).map(Await.result(_, Duration.Inf))
> 
> accum.add((System.nanoTime() - before) / 1e9)
> Iterator(1)
>   }.count()
>   println("get remote bytes take: " + accum.value/8)
>   i += 1
> }
>   }
> {code}
> In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
> in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s
> However if fetch block in single thread, the gap is much smaller.
> spark1.6.2  get remote bytes: 0.21 s
> spark1.5.1  get remote bytes: 0.20 s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6

2016-10-14 Thread ding (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ding updated SPARK-17951:
-
Description: 
The following code demonstrates the issue:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"BMTest")
val size = 3344570
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 100, 8)
var accum = sc.accumulator(0.0, "get remote bytes")
var i = 0
while(i < 91) {
  accum = sc.accumulator(0.0, "get remote bytes")
  val test = data.mapPartitionsWithIndex { (pid, iter) =>
val N = size
val bm = SparkEnv.get.blockManager
val blockId = TaskResultBlockId(10*i + pid)
val test = new Array[Byte](N)
Random.nextBytes(test)
val buffer = ByteBuffer.allocate(N)
buffer.limit(N)
buffer.put(test)
bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
Iterator(1)
  }.count()
  
  data.mapPartitionsWithIndex { (pid, iter) =>
val before = System.nanoTime()

val bm = SparkEnv.get.blockManager
(0 to 7).map(s => {
  Future {
val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
  }
}).map(Await.result(_, Duration.Inf))

accum.add((System.nanoTime() - before) / 1e9)
Iterator(1)
  }.count()
  println("get remote bytes take: " + accum.value/8)
  i += 1
}
  }

In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.21 s
spark1.5.1  get remote bytes: 0.20 s



  was:
The following code demonstrates the issue:
 def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"BMTest")
val size = 3344570
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 100, 8)
var accum = sc.accumulator(0.0, "get remote bytes")
var i = 0
while(i < 91) {
  accum = sc.accumulator(0.0, "get remote bytes")
  val test = data.mapPartitionsWithIndex { (pid, iter) =>
val N = size
val bm = SparkEnv.get.blockManager
val blockId = TaskResultBlockId(10*i + pid)
val test = new Array[Byte](N)
Random.nextBytes(test)
val buffer = ByteBuffer.allocate(N)
buffer.limit(N)
buffer.put(test)
bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
Iterator(1)
  }.count()
  
  data.mapPartitionsWithIndex { (pid, iter) =>
val before = System.nanoTime()
val bm = SparkEnv.get.blockManager
(1 to 8).map(s => {
  Future {
val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
  }
}).map(Await.result(_, Duration.Inf))

accum.add((System.nanoTime() - before) / 1e9)
Iterator(1)
  }.count()
  println("get remote bytes take: " + accum.value/8)
  i += 1
}
  }

In spark1.6.2, average of "getting remote bytes" time is: 0.16s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.07s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.191421s
spark1.5.1  get remote bytes: 0.181312s




> BlockFetch with multiple threads slows down after spark 1.6
> ---
>
> Key: SPARK-17951
> URL: https://issues.apache.org/jira/browse/SPARK-17951
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 1.6.2
> Environment: cluster with 8 node, each node has 28 cores. 10Gb network
>Reporter: ding
>
> The following code demonstrates the issue:
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName(s"BMTest")
> val size = 3344570
> val sc = new SparkContext(conf)
> val data = sc.parallelize(1 to 100, 8)
> var accum = sc.accumulator(0.0, "get remote bytes")
> var i = 0
> while(i < 91) {
>   accum = sc.accumulator(0.0, "get remote bytes")
>   val test = data.mapPartitionsWithIndex { (pid, iter) =>
> val N = size
> val bm = SparkEnv.get.blockManager
> val blockId = TaskResultBlockId(10*i + pid)
> val test = new Array[Byte](N)
> Random.nextBytes(test)
> val buffer = ByteBuffer.allocate(N)
> buffer.limit(N)
> buffer.put(test)
> bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
> Iterator(1)
>   }.count()
>   
>   data.mapPartitionsWithIndex { (pid, iter) =>
> val before = System.nanoTime()
> 
> val bm = SparkEnv.get.blockManager

[jira] [Created] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6

2016-10-14 Thread ding (JIRA)
ding created SPARK-17951:


 Summary: BlockFetch with multiple threads slows down after spark 
1.6
 Key: SPARK-17951
 URL: https://issues.apache.org/jira/browse/SPARK-17951
 Project: Spark
  Issue Type: Bug
  Components: Block Manager, Spark Core
Affects Versions: 1.6.2
 Environment: cluster with 8 node, each node has 28 cores. 10Gb network
Reporter: ding


The following code demonstrates the issue:
 def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"BMTest")
val size = 3344570
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 100, 8)
var accum = sc.accumulator(0.0, "get remote bytes")
var i = 0
while(i < 91) {
  accum = sc.accumulator(0.0, "get remote bytes")
  val test = data.mapPartitionsWithIndex { (pid, iter) =>
val N = size
val bm = SparkEnv.get.blockManager
val blockId = TaskResultBlockId(10*i + pid)
val test = new Array[Byte](N)
Random.nextBytes(test)
val buffer = ByteBuffer.allocate(N)
buffer.limit(N)
buffer.put(test)
bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
Iterator(1)
  }.count()
  
  data.mapPartitionsWithIndex { (pid, iter) =>
val before = System.nanoTime()
val bm = SparkEnv.get.blockManager
(1 to 8).map(s => {
  Future {
val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
  }
}).map(Await.result(_, Duration.Inf))

accum.add((System.nanoTime() - before) / 1e9)
Iterator(1)
  }.count()
  println("get remote bytes take: " + accum.value/8)
  i += 1
}
  }

In spark1.6.2, average of "getting remote bytes" time is: 0.16s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.07s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.191421s
spark1.5.1  get remote bytes: 0.181312s





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17559) PeriodicGraphCheckpointer did not persist edges as expected in some cases

2016-10-04 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15545991#comment-15545991
 ] 

ding commented on SPARK-17559:
--

Yes, it's the correct username. Thank you for your reviewing.

> PeriodicGraphCheckpointer did not persist edges as expected in some cases
> -
>
> Key: SPARK-17559
> URL: https://issues.apache.org/jira/browse/SPARK-17559
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Reporter: ding
>Assignee: dingding
>Priority: Minor
> Fix For: 2.0.2, 2.1.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When use PeriodicGraphCheckpointer to persist graph, sometimes the edge isn't 
> persisted. As currently only when vertices's storage level is none, graph is 
> persisted. However there is a chance vertices's storage level is not none 
> while edges's is none. Eg. graph created by a outerJoinVertices operation, 
> vertices is automatically cached while edges is not. In this way, edges will 
> not be persisted if we use PeriodicGraphCheckpointer do persist.
> See below minimum example:
>val graphCheckpointer = new PeriodicGraphCheckpointer[Array[String], 
> Int](2, sc)
> val users = sc.textFile("data/graphx/users.txt")
>   .map(line => line.split(",")).map(parts => (parts.head.toLong, 
> parts.tail))
> val followerGraph = GraphLoader.edgeListFile(sc, 
> "data/graphx/followers.txt")
> val graph = followerGraph.outerJoinVertices(users) {
>   case (uid, deg, Some(attrList)) => attrList
>   case (uid, deg, None) => Array.empty[String]
> }
> graphCheckpointer.update(graph)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17097) Pregel does not keep vertex state properly; fails to terminate

2016-09-29 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533976#comment-15533976
 ] 

ding commented on SPARK-17097:
--

Because diff of case class behaves different with regular class. A case class 
implements the equals method while a class does not. When comparing two objects 
implemented as a class is actually comparing the memory address of the objects. 
In above code, if we remove "case",  after transform, the original vertices is 
still different with the new generated vertices although they have the same 
value. In this way, EdgeTriplet is able to be updated since there is difference 
and after 1 iteration there will be no active message and the application will 
terminate. 

> Pregel does not keep vertex state properly; fails to terminate 
> ---
>
> Key: SPARK-17097
> URL: https://issues.apache.org/jira/browse/SPARK-17097
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 1.6.0
> Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel
>Reporter: Seth Bromberger
>
> Consider the following minimum example:
> {code:title=PregelBug.scala|borderStyle=solid}
> package testGraph
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _}
> object PregelBug {
>   def main(args: Array[String]) = {
> //FIXME breaks if TestVertex is a case class; works if not case class
> case class TestVertex(inId: VertexId,
>  inData: String,
>  inLabels: collection.mutable.HashSet[String]) extends 
> Serializable {
>   val id = inId
>   val value = inData
>   val labels = inLabels
> }
> class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends 
> Serializable  {
>   val src = inSrc
>   val dst = inDst
>   val data = inData
> }
> val startString = "XXXSTARTXXX"
> val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val vertexes = Vector(
>   new TestVertex(0, "label0", collection.mutable.HashSet[String]()),
>   new TestVertex(1, "label1", collection.mutable.HashSet[String]())
> )
> val links = Vector(
>   new TestLink(0, 1, "linkData01")
> )
> val vertexes_packaged = vertexes.map(v => (v.id, v))
> val links_packaged = links.map(e => Edge(e.src, e.dst, e))
> val graph = Graph[TestVertex, 
> TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged))
> def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: 
> Vector[String]): TestVertex = {
>   message.foreach {
> case `startString` =>
>   if (vdata.id == 0L)
> vdata.labels.add(vdata.value)
> case m =>
>   if (!vdata.labels.contains(m))
> vdata.labels.add(m)
>   }
>   new TestVertex(vdata.id, vdata.value, vdata.labels)
> }
> def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): 
> Iterator[(VertexId, Vector[String])] = {
>   val srcLabels = triplet.srcAttr.labels
>   val dstLabels = triplet.dstAttr.labels
>   val msgsSrcDst = srcLabels.diff(dstLabels)
> .map(label => (triplet.dstAttr.id, Vector[String](label)))
>   val msgsDstSrc = dstLabels.diff(dstLabels)
> .map(label => (triplet.srcAttr.id, Vector[String](label)))
>   msgsSrcDst.toIterator ++ msgsDstSrc.toIterator
> }
> def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] 
> = m1.union(m2).distinct
> val g = graph.pregel(Vector[String](startString))(vertexProgram, 
> sendMessage, mergeMessage)
> println("---pregel done---")
> println("vertex info:")
> g.vertices.foreach(
>   v => {
> val labels = v._2.labels
> println(
>   "vertex " + v._1 +
> ": name = " + v._2.id +
> ", labels = " + labels)
>   }
> )
>   }
> }
> {code}
> This code never terminates even though we expect it to. To fix, we simply 
> remove the "case" designation for the TestVertex class (see FIXME comment), 
> and then it behaves as expected.
> (Apologies if this has been fixed in later versions; we're unfortunately 
> pegged to 2.10.5 / 1.6.0 for now.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5484) Pregel should checkpoint periodically to avoid StackOverflowError

2016-09-16 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15496857#comment-15496857
 ] 

ding commented on SPARK-5484:
-

Thank you for your kindly reminder. However as the code is almost ready, I will 
still send PR in case someone has interest to review it.

> Pregel should checkpoint periodically to avoid StackOverflowError
> -
>
> Key: SPARK-5484
> URL: https://issues.apache.org/jira/browse/SPARK-5484
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Reporter: Ankur Dave
>Assignee: Ankur Dave
>
> Pregel-based iterative algorithms with more than ~50 iterations begin to slow 
> down and eventually fail with a StackOverflowError due to Spark's lack of 
> support for long lineage chains. Instead, Pregel should checkpoint the graph 
> periodically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17559) PeriodicGraphCheckpointer didnot persist edges as expected in some cases

2016-09-15 Thread ding (JIRA)
ding created SPARK-17559:


 Summary: PeriodicGraphCheckpointer didnot persist edges as 
expected in some cases
 Key: SPARK-17559
 URL: https://issues.apache.org/jira/browse/SPARK-17559
 Project: Spark
  Issue Type: Bug
  Components: MLlib
Reporter: ding
Priority: Minor


When use PeriodicGraphCheckpointer to persist graph, sometimes the edge isn't 
persisted. As currently only when vertices's storage level is none, graph is 
persisted. However there is a chance vertices's storage level is not none while 
edges's is none. Eg. graph created by a outerJoinVertices operation, vertices 
is automatically cached while edges is not. In this way, edges will not be 
persisted if we use PeriodicGraphCheckpointer do persist.

See below minimum example:
   val graphCheckpointer = new PeriodicGraphCheckpointer[Array[String], Int](2, 
sc)
val users = sc.textFile("data/graphx/users.txt")
  .map(line => line.split(",")).map(parts => (parts.head.toLong, 
parts.tail))
val followerGraph = GraphLoader.edgeListFile(sc, 
"data/graphx/followers.txt")

val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  case (uid, deg, None) => Array.empty[String]
}
graphCheckpointer.update(graph)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5484) Pregel should checkpoint periodically to avoid StackOverflowError

2016-09-15 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15495131#comment-15495131
 ] 

ding commented on SPARK-5484:
-

I will work on the issue if nobody took it.

> Pregel should checkpoint periodically to avoid StackOverflowError
> -
>
> Key: SPARK-5484
> URL: https://issues.apache.org/jira/browse/SPARK-5484
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Reporter: Ankur Dave
>Assignee: Ankur Dave
>
> Pregel-based iterative algorithms with more than ~50 iterations begin to slow 
> down and eventually fail with a StackOverflowError due to Spark's lack of 
> support for long lineage chains. Instead, Pregel should checkpoint the graph 
> periodically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17097) Pregel does not keep vertex state properly; fails to terminate

2016-09-13 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488609#comment-15488609
 ] 

ding commented on SPARK-17097:
--

I am afraid the attached sample code fail to terminate with case class is not 
caused by Pregel or graphx bug. As in the sample code, vertices(inLabels here) 
is updated inplace which supposed not to happen. In this way, after transform 
operations, the original vertices is also updated and it has exactly the same 
value with the new generated vertices in above code when VD is case class. It 
leads to fail to update EdgeTriplet as there is no difference of the vertices. 
So EdgeTriplet dstLabels is always empty while srcLabels contains a value. And 
there is always active message which lead to Pregel not terminate.

One way to fix the problem is remove inplace update in vertexProgram by clone 
the labels and make update in the new labels. I have tried below code and it 
works.
def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: 
Vector[String]): TestVertex = {
  val labels = vdata.labels.clone() 
  message.foreach {
case `startString` =>
  if (vdata.id == 0L)
labels.add(vdata.value )  

case m =>
  if (!vdata.labels.contains(m))
labels.add(m)
  }
  new TestVertex(vdata.id, vdata.value, labels)
}

Hope this information is helpful to you.

> Pregel does not keep vertex state properly; fails to terminate 
> ---
>
> Key: SPARK-17097
> URL: https://issues.apache.org/jira/browse/SPARK-17097
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 1.6.0
> Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel
>Reporter: Seth Bromberger
>
> Consider the following minimum example:
> {code:title=PregelBug.scala|borderStyle=solid}
> package testGraph
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _}
> object PregelBug {
>   def main(args: Array[String]) = {
> //FIXME breaks if TestVertex is a case class; works if not case class
> case class TestVertex(inId: VertexId,
>  inData: String,
>  inLabels: collection.mutable.HashSet[String]) extends 
> Serializable {
>   val id = inId
>   val value = inData
>   val labels = inLabels
> }
> class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends 
> Serializable  {
>   val src = inSrc
>   val dst = inDst
>   val data = inData
> }
> val startString = "XXXSTARTXXX"
> val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val vertexes = Vector(
>   new TestVertex(0, "label0", collection.mutable.HashSet[String]()),
>   new TestVertex(1, "label1", collection.mutable.HashSet[String]())
> )
> val links = Vector(
>   new TestLink(0, 1, "linkData01")
> )
> val vertexes_packaged = vertexes.map(v => (v.id, v))
> val links_packaged = links.map(e => Edge(e.src, e.dst, e))
> val graph = Graph[TestVertex, 
> TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged))
> def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: 
> Vector[String]): TestVertex = {
>   message.foreach {
> case `startString` =>
>   if (vdata.id == 0L)
> vdata.labels.add(vdata.value)
> case m =>
>   if (!vdata.labels.contains(m))
> vdata.labels.add(m)
>   }
>   new TestVertex(vdata.id, vdata.value, vdata.labels)
> }
> def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): 
> Iterator[(VertexId, Vector[String])] = {
>   val srcLabels = triplet.srcAttr.labels
>   val dstLabels = triplet.dstAttr.labels
>   val msgsSrcDst = srcLabels.diff(dstLabels)
> .map(label => (triplet.dstAttr.id, Vector[String](label)))
>   val msgsDstSrc = dstLabels.diff(dstLabels)
> .map(label => (triplet.srcAttr.id, Vector[String](label)))
>   msgsSrcDst.toIterator ++ msgsDstSrc.toIterator
> }
> def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] 
> = m1.union(m2).distinct
> val g = graph.pregel(Vector[String](startString))(vertexProgram, 
> sendMessage, mergeMessage)
> println("---pregel done---")
> println("vertex info:")
> g.vertices.foreach(
>   v => {
> val labels = v._2.labels
> println(
>   "vertex " + v._1 +
> ": name = " + v._2.id +
> ", labels = " + labels)
>   }
> )
>   }
> }
> {code}
> This code never terminates even though we expect it to. To fix, we simply 
> remove the "case" designation for the TestVertex class (see FIXME comment), 

[jira] [Commented] (SPARK-16071) Not sufficient array size checks to avoid integer overflows in Tungsten

2016-06-21 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341354#comment-15341354
 ] 

ding commented on SPARK-16071:
--

OK, I see. Thank you for your clarification.

> Not sufficient array size checks to avoid integer overflows in Tungsten
> ---
>
> Key: SPARK-16071
> URL: https://issues.apache.org/jira/browse/SPARK-16071
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Xiangrui Meng
>Priority: Critical
>
> Several bugs have been found caused by integer overflows in Tungsten. This 
> JIRA is for taking a final pass before 2.0 release to reduce potential bugs 
> and issues. We should do at least the following:
> * Raise exception early instead of later throwing NegativeArraySize (which is 
> slow and might cause silent errors)
> * Document clearly the largest array size we support in DataFrames.
> To reproduce one of the issues:
> {code}
> val n = 1e8.toInt // try 2e8, 3e8
> sc.parallelize(0 until 1, 1).map(i => new 
> Array[Int](n)).toDS.map(_.size).show()
> {code}
> Result:
> * n=1e8: correct but slow (see SPARK-16043)
> * n=2e8: NegativeArraySize exception
> {code:none}
> java.lang.NegativeArraySizeException
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:61)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.RDDScanExec$$anonfun$doExecute$1$$anonfun$apply$3.apply(ExistingRDD.scala:123)
>   at 
> org.apache.spark.sql.execution.RDDScanExec$$anonfun$doExecute$1$$anonfun$apply$3.apply(ExistingRDD.scala:121)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> * n=3e8: NegativeArraySize exception but raised at a different location
> {code:none}
> java.lang.RuntimeException: Error while encoding: 
> java.lang.NegativeArraySizeException
> newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS 
> value#108
> +- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData)
>+- input[0, [I, true]
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:257)
>   at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:430)
>   at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:430)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780)
>   at 
> 

[jira] [Commented] (SPARK-16071) Not sufficient array size checks to avoid integer overflows in Tungsten

2016-06-21 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341318#comment-15341318
 ] 

ding commented on SPARK-16071:
--

The exception raised in different location as one happened in encoder(n = 3e8) 
and the other happened in executing logical plan(n = 2e8) when createDataset. 
And both of exceptions are thrown from grow function in BufferHolder. I think 
we can add array size check here. I will send a PR later.

> Not sufficient array size checks to avoid integer overflows in Tungsten
> ---
>
> Key: SPARK-16071
> URL: https://issues.apache.org/jira/browse/SPARK-16071
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Xiangrui Meng
>Priority: Critical
>
> Several bugs have been found caused by integer overflows in Tungsten. This 
> JIRA is for taking a final pass before 2.0 release to reduce potential bugs 
> and issues. We should do at least the following:
> * Raise exception early instead of later throwing NegativeArraySize (which is 
> slow and might cause silent errors)
> * Document clearly the largest array size we support in DataFrames.
> To reproduce one of the issues:
> {code}
> val n = 1e8.toInt // try 2e8, 3e8
> sc.parallelize(0 until 1, 1).map(i => new 
> Array[Int](n)).toDS.map(_.size).show()
> {code}
> Result:
> * n=1e8: correct but slow (see SPARK-16043)
> * n=2e8: NegativeArraySize exception
> {code:none}
> java.lang.NegativeArraySizeException
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:61)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.RDDScanExec$$anonfun$doExecute$1$$anonfun$apply$3.apply(ExistingRDD.scala:123)
>   at 
> org.apache.spark.sql.execution.RDDScanExec$$anonfun$doExecute$1$$anonfun$apply$3.apply(ExistingRDD.scala:121)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:780)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> * n=3e8: NegativeArraySize exception but raised at a different location
> {code:none}
> java.lang.RuntimeException: Error while encoding: 
> java.lang.NegativeArraySizeException
> newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS 
> value#108
> +- newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData)
>+- input[0, [I, true]
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:257)
>   at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:430)
>   at 
> org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:430)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> 

[jira] [Commented] (SPARK-15562) Temp directory is not deleted after program exit in DataFrameExample

2016-05-26 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303326#comment-15303326
 ] 

ding commented on SPARK-15562:
--

OK, I will check if there is similar JIRAs before creating a new one next time.

> Temp directory is not deleted after program exit in DataFrameExample
> 
>
> Key: SPARK-15562
> URL: https://issues.apache.org/jira/browse/SPARK-15562
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 1.6.0
>Reporter: ding
>Priority: Minor
>
> Temp directory used to save records is not deleted after program exit in 
> DataFrameExample. Although it called deleteOnExit, it doesn't work as the 
> directory is not empty. Similar things happend in ContextCleanerSuite



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15562) Temp directory is not deleted after program exit in DataFrameExample

2016-05-26 Thread ding (JIRA)
ding created SPARK-15562:


 Summary: Temp directory is not deleted after program exit in 
DataFrameExample
 Key: SPARK-15562
 URL: https://issues.apache.org/jira/browse/SPARK-15562
 Project: Spark
  Issue Type: Bug
  Components: ML
Affects Versions: 1.6.0
Reporter: ding
Priority: Minor


Temp directory used to save records is not deleted after program exit in 
DataFrameExample. Although it called deleteOnExit, it doesn't work as the 
directory is not empty. Similar things happend in ContextCleanerSuite



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15172) Warning message should explicitly tell user initial coefficients is ignored if its size doesn't match expected size in LogisticRegression

2016-05-06 Thread ding (JIRA)
ding created SPARK-15172:


 Summary: Warning message should explicitly tell user initial 
coefficients is ignored if its size doesn't match expected size in 
LogisticRegression
 Key: SPARK-15172
 URL: https://issues.apache.org/jira/browse/SPARK-15172
 Project: Spark
  Issue Type: Improvement
  Components: ML
Reporter: ding
Priority: Trivial


>From ML/LogisticRegression code logic, if size of initial coefficients doesn't 
>match expected size, initial coefficients value will be ignored. We should 
>explicitly tell user the information. Besides, log size of initial 
>coefficients should be more straightforward than log initial coefficients 
>value when size mismatch happened.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-14969) Remove unnecessary compute function in LogisticGradient

2016-04-27 Thread ding (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ding reopened SPARK-14969:
--

> Remove unnecessary compute function in LogisticGradient
> ---
>
> Key: SPARK-14969
> URL: https://issues.apache.org/jira/browse/SPARK-14969
> Project: Spark
>  Issue Type: Improvement
>  Components: MLlib
>Reporter: ding
>Priority: Trivial
>
> compute function has been implemented in base class Gradient. There is no 
> necessary to have a same implementation of compute in derived class 
> LogisticGradient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-14969) Remove unnecessary compute function in LogisticGradient

2016-04-27 Thread ding (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ding resolved SPARK-14969.
--
Resolution: Fixed

> Remove unnecessary compute function in LogisticGradient
> ---
>
> Key: SPARK-14969
> URL: https://issues.apache.org/jira/browse/SPARK-14969
> Project: Spark
>  Issue Type: Improvement
>  Components: MLlib
>Reporter: ding
>Priority: Trivial
>
> compute function has been implemented in base class Gradient. There is no 
> necessary to have a same implementation of compute in derived class 
> LogisticGradient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14969) Remove unnecessary compute function in LogisticGradient

2016-04-27 Thread ding (JIRA)
ding created SPARK-14969:


 Summary: Remove unnecessary compute function in LogisticGradient
 Key: SPARK-14969
 URL: https://issues.apache.org/jira/browse/SPARK-14969
 Project: Spark
  Issue Type: Improvement
  Components: MLlib
Reporter: ding
Priority: Trivial


compute function has been implemented in base class Gradient. There is no 
necessary to have a same implementation of compute in derived class 
LogisticGradient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14889) scala.MatchError: NONE (of class scala.Enumeration$Val) when spark.scheduler.mode=NONE

2016-04-25 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15256061#comment-15256061
 ] 

ding commented on SPARK-14889:
--

@Jacek Laskowski, do you mind if I take the jira and give a fix to the issue?

> scala.MatchError: NONE (of class scala.Enumeration$Val) when 
> spark.scheduler.mode=NONE
> --
>
> Key: SPARK-14889
> URL: https://issues.apache.org/jira/browse/SPARK-14889
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.0.0
>Reporter: Jacek Laskowski
>Priority: Minor
>
> When {{TaskSchedulerImpl}} is initialized it pattern matches on acceptable 
> scheduling modes - {{FIFO}} and {{FAIR}} modes - but misses {{NONE}}.
> It should at least pattern match the case and throw a more meaningful 
> exception.
> {code}
> ➜  spark git:(master) ✗ ./bin/spark-shell -c spark.scheduler.mode=NONE
> 16/04/25 09:15:00 ERROR SparkContext: Error initializing SparkContext.
> scala.MatchError: NONE (of class scala.Enumeration$Val)
>   at org.apache.spark.scheduler.Pool.(Pool.scala:53)
>   at 
> org.apache.spark.scheduler.TaskSchedulerImpl.initialize(TaskSchedulerImpl.scala:131)
>   at 
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2352)
>   at org.apache.spark.SparkContext.(SparkContext.scala:492)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13378) Add tee method to RDD

2016-02-18 Thread Richard Ding (JIRA)
Richard Ding created SPARK-13378:


 Summary: Add tee method to RDD
 Key: SPARK-13378
 URL: https://issues.apache.org/jira/browse/SPARK-13378
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.6.0
Reporter: Richard Ding


In our application, we sometimes need to save the partial/intermediate results 
to side files in the middle of a data pipeline/DAG. The only way now to do this 
is to use saveAsTextFile method which only runs at the end of a pipeline. 
Otherwise multiple jobs are needed. We’ve implemented ‘tee’ method on RDD that 
is similar to Unix tee utility. Below are the proposed methods:

{code}
def tee(path: String) : RDD[T]
{code}
Return a new RDD that is the same as this RDD but also save a copy of this RDD 
to a text file, using string representation of elements.

{code}
def tee(path: String, f: (T) => Boolean): RDD[T]
{code}
Return a new RDD that is the same as this RDD but also save to a text file a 
copy of the elements in this RDD that satisfy a predicate , using string 
representation of elements.

These methods can be used in RDD pipelines in ways similar to the tee utility 
in Unix command pipeline, for example, 

{code}
sc.textFile(dataFile).map(x => x.split(“\t”)
.map(x => (x(0), x(1).toInt, x(2))
.tee(“output/tee-data-1”)
.tee(“output/tee-data-2”, x=> x._2 >= 10)
.groupBy(x => x._1)
.saveAsTextFile(“output/out-data”)
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5556) Latent Dirichlet Allocation (LDA) using Gibbs sampler

2015-09-09 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738090#comment-14738090
 ] 

ding commented on SPARK-5556:
-

We have made the spark package and it can be find here 
http://spark-packages.org/package/intel-analytics/TopicModeling

> Latent Dirichlet Allocation (LDA) using Gibbs sampler 
> --
>
> Key: SPARK-5556
> URL: https://issues.apache.org/jira/browse/SPARK-5556
> Project: Spark
>  Issue Type: New Feature
>  Components: MLlib
>Reporter: Guoqiang Li
>Assignee: Pedro Rodriguez
> Attachments: LDA_test.xlsx, spark-summit.pptx
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5556) Latent Dirichlet Allocation (LDA) using Gibbs sampler

2015-08-14 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696677#comment-14696677
 ] 

ding commented on SPARK-5556:
-

The code can be found https://github.com/intel-analytics/TopicModeling. 

There is an example in the package, you can try gibbs sampling lda or online 
lda by setting --optimizer as gibbs or online


 Latent Dirichlet Allocation (LDA) using Gibbs sampler 
 --

 Key: SPARK-5556
 URL: https://issues.apache.org/jira/browse/SPARK-5556
 Project: Spark
  Issue Type: New Feature
  Components: MLlib
Reporter: Guoqiang Li
Assignee: Pedro Rodriguez
 Attachments: LDA_test.xlsx, spark-summit.pptx






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5418) Output directory for shuffle should consider left space of each directory set in conf

2015-01-26 Thread ding (JIRA)
ding created SPARK-5418:
---

 Summary: Output directory for shuffle should consider left space 
of each directory set in conf
 Key: SPARK-5418
 URL: https://issues.apache.org/jira/browse/SPARK-5418
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
 Environment: Ubuntu, others should be similar
Reporter: ding
Priority: Minor


I set multiple directorys in conf spark.local.dir as scratch space, one of 
them(eg. /mnt/disk1) have 30G left space while others(eg./mnt/disk2) have 100G. 
In current version, spark use hash to figure out which directory is used for 
scratch space. It means each directory has the same chance. After hounds of 
iteration of pagerank, there is No space left exception and driver crashes. 
It does not make sense since there is still 70G+ left space in other 
directorys. We should take consider left space on each directorys when figure 
out which directory should be map output dir. I will send a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4105) FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle

2015-01-24 Thread ding (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290961#comment-14290961
 ] 

ding commented on SPARK-4105:
-

I hit this error when using pagerank(It cannot be consistent repro as I only 
hit once). I am not using the KryoSerializer but I am using the default 
serializer. The Spark code is get from chunk at 2015/1/19 which should be later 
than spark 1.2.0. 

15/01/23 23:32:57 WARN scheduler.TaskSetManager: Lost task 347.0 in stage 
9461.0 (TID 302687, sr213): FetchFailed(BlockManagerId(13, sr207, 49805), 
shuffleId=399, mapId=461, reduceId=347, message=
org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.graphx.impl.VertexPartitionBaseOps.aggregateUsingIndex(VertexPartitionBaseOps.scala:207)
at 
org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$5$$anonfun$apply$4.apply(VertexRDDImpl.scala:171)
at 
org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$5$$anonfun$apply$4.apply(VertexRDDImpl.scala:171)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:113)
at 
org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:111)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:65)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
at 
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:143)
at 
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:299)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:299)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)



 FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based 
 shuffle
 -

 Key: SPARK-4105
 URL: