[jira] [Created] (SPARK-39531) Show "Name" in Spark master UI for drivers

2022-06-20 Thread Jack Hu (Jira)
Jack Hu created SPARK-39531:
---

 Summary: Show "Name" in Spark master UI for drivers
 Key: SPARK-39531
 URL: https://issues.apache.org/jira/browse/SPARK-39531
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.2.1
Reporter: Jack Hu


For the applications in Spark master in standalone cluster, the "Name" 
(spark.app.name) shows for running and finished applications, but for driver, 
Spark master UI only shows main class name, it's hard to map the class name to 
application name when there are lots of applications and drivers (in some 
cases, the class name would be same ,but the application name would be 
difference in a shared cluster), could you show "Name" (spark.app.name) also 
for Drivers in Spark master UI?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36672) Using value from map in grouping sets result org.apache.spark.sql.AnalysisException

2021-09-05 Thread Jack Hu (Jira)
Jack Hu created SPARK-36672:
---

 Summary: Using value from map in grouping sets result 
org.apache.spark.sql.AnalysisException
 Key: SPARK-36672
 URL: https://issues.apache.org/jira/browse/SPARK-36672
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.0
Reporter: Jack Hu


Steps to reproduce:
 # create a table with map
{code:java}
create table test (int_value INT, dims MAP) using parquet{code}
 # Run following query:
{code:java}
select int_value, count(1)
from test
group by int_value, dims.dim_x, dims.dim_y
grouping sets ( (int_value, dims.dim_x), (int_value, dims.dim_y)){code}


The call stack:
{noformat}
org.apache.spark.sql.AnalysisException: dims#34[dim_x] AS dim_x#35 doesn't show 
up in the GROUP BY list ArrayBuffer(int_value#33 AS int_value#41, 
dims#34[dim_x] AS dim_x#37 AS dim_x#42, dims#34[dim_y] AS dim_y#38 AS dim_y#43);
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19$$anonfun$apply$49$$anonfun$21.apply(Analyzer.scala:387)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19$$anonfun$apply$49$$anonfun$21.apply(Analyzer.scala:387)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19$$anonfun$apply$49.apply(Analyzer.scala:386)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19$$anonfun$apply$49.apply(Analyzer.scala:385)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19.apply(Analyzer.scala:385)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19.apply(Analyzer.scala:384)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.constructExpand(Analyzer.scala:384)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveGroupingAnalytics$$constructAggregate(Analyzer.scala:448)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$apply$6.applyOrElse(Analyzer.scala:485)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$apply$6.applyOrElse(Analyzer.scala:473)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.apply(Analyzer.scala:473)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.apply(Analyzer.scala:287)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
   

[jira] [Updated] (SPARK-36252) Add log files rolling policy for driver running in cluster mode with spark standalone cluster

2021-07-21 Thread Jack Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-36252:

Description: 
For a long running driver in cluster mode, there is no rolling policy, the log 
(stdout/stderr) may accupy lots of space, user needs an external tool to clean 
the old logs, it's not user friendly. 

For executor, following 5 configurations is used to control the log file 
rolling policy:
{code:java}
spark.executor.logs.rolling.maxRetainedFiles
spark.executor.logs.rolling.enableCompression
spark.executor.logs.rolling.maxSize
spark.executor.logs.rolling.strategy
spark.executor.logs.rolling.time.interval
{code}

For driver running in cluster mode:
1. reuse the executor settings
2. similar to executor: add following configurations (only works for 
stderr/stdout for driver in cluster mode)
{code:java}
spark.driver.logs.rolling.maxRetainedFiles
spark.driver.logs.rolling.enableCompression
spark.driver.logs.rolling.maxSize
spark.driver.logs.rolling.strategy
spark.driver.logs.rolling.time.interval
{code}

#2 seems better, do you agree?

  was:
For a long running driver in cluster mode, there is no rolling policy, the log 
(stdout/stderr) may accupy lots of space, user needs a external tool to clean 
the old logs, it's not friendly. 

For executor, following 5 configurations is used to control the log file 
rolling policy:
{code:java}
spark.executor.logs.rolling.maxRetainedFiles
spark.executor.logs.rolling.enableCompression
spark.executor.logs.rolling.maxSize
spark.executor.logs.rolling.strategy
spark.executor.logs.rolling.time.interval
{code}

For driver running in cluster mode:
1. reuse the executor settings
2. similar to executor: add following configurations (only works for 
stderr/stdout for driver in cluster mode)
{code:java}
spark.driver.logs.rolling.maxRetainedFiles
spark.driver.logs.rolling.enableCompression
spark.driver.logs.rolling.maxSize
spark.driver.logs.rolling.strategy
spark.driver.logs.rolling.time.interval
{code}

#2 seems better, do you agree?


> Add log files rolling policy for driver running in cluster mode with spark 
> standalone cluster
> -
>
> Key: SPARK-36252
> URL: https://issues.apache.org/jira/browse/SPARK-36252
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.1.2
>Reporter: Jack Hu
>Priority: Major
>
> For a long running driver in cluster mode, there is no rolling policy, the 
> log (stdout/stderr) may accupy lots of space, user needs an external tool to 
> clean the old logs, it's not user friendly. 
> For executor, following 5 configurations is used to control the log file 
> rolling policy:
> {code:java}
> spark.executor.logs.rolling.maxRetainedFiles
> spark.executor.logs.rolling.enableCompression
> spark.executor.logs.rolling.maxSize
> spark.executor.logs.rolling.strategy
> spark.executor.logs.rolling.time.interval
> {code}
> For driver running in cluster mode:
> 1. reuse the executor settings
> 2. similar to executor: add following configurations (only works for 
> stderr/stdout for driver in cluster mode)
> {code:java}
> spark.driver.logs.rolling.maxRetainedFiles
> spark.driver.logs.rolling.enableCompression
> spark.driver.logs.rolling.maxSize
> spark.driver.logs.rolling.strategy
> spark.driver.logs.rolling.time.interval
> {code}
> #2 seems better, do you agree?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36252) Add log files rolling policy for driver running in cluster mode with spark standalone cluster

2021-07-21 Thread Jack Hu (Jira)
Jack Hu created SPARK-36252:
---

 Summary: Add log files rolling policy for driver running in 
cluster mode with spark standalone cluster
 Key: SPARK-36252
 URL: https://issues.apache.org/jira/browse/SPARK-36252
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.1.2
Reporter: Jack Hu


For a long running driver in cluster mode, there is no rolling policy, the log 
(stdout/stderr) may accupy lots of space, user needs a external tool to clean 
the old logs, it's not friendly. 

For executor, following 5 configurations is used to control the log file 
rolling policy:
{code:java}
spark.executor.logs.rolling.maxRetainedFiles
spark.executor.logs.rolling.enableCompression
spark.executor.logs.rolling.maxSize
spark.executor.logs.rolling.strategy
spark.executor.logs.rolling.time.interval
{code}

For driver running in cluster mode:
1. reuse the executor settings
2. similar to executor: add following configurations (only works for 
stderr/stdout for driver in cluster mode)
{code:java}
spark.driver.logs.rolling.maxRetainedFiles
spark.driver.logs.rolling.enableCompression
spark.driver.logs.rolling.maxSize
spark.driver.logs.rolling.strategy
spark.driver.logs.rolling.time.interval
{code}

#2 seems better, do you agree?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure

2021-07-01 Thread Jack Hu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17372425#comment-17372425
 ] 

Jack Hu edited comment on SPARK-35027 at 7/1/21, 6:42 AM:
--

Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in 
"[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59];,
 but the cloure in finally only closes the [output stream (to 
file)|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79],
 but leave the "inputStream" open., which is the pipe's output stream. 


was (Author: jhu):
Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in 
"[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59];,
 but the cloure in finally only closes the [output stream (to 
file)|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79]],
 but leave the "inputStream" open., which is the pipe's output stream. 

> Close the inputStream in FileAppender when writing the logs failure
> ---
>
> Key: SPARK-35027
> URL: https://issues.apache.org/jira/browse/SPARK-35027
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Jack Hu
>Priority: Major
>
> In Spark Cluster, the ExecutorRunner uses FileAppender  to redirect the 
> stdout/stderr of executors to file, when the writing processing is failure 
> due to some reasons: disk full, the FileAppender will only close the input 
> stream to file, but leave the pipe's stdout/stderr open, following writting 
> operation in executor side may be hung. 
> need to close the inputStream in FileAppender ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure

2021-07-01 Thread Jack Hu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17372425#comment-17372425
 ] 

Jack Hu edited comment on SPARK-35027 at 7/1/21, 6:42 AM:
--

Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in 
"[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59];,
 but the cloure in finally only closes the [output stream (to 
file)|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79]],
 but leave the "inputStream" open., which is the pipe's output stream. 


was (Author: jhu):
Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in 
"[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59];,
 but the cloure in finally only closes the output stream (to file), [but leave 
the 
"inp|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79]utStream;
 open., which is the pipe's output stream. 

> Close the inputStream in FileAppender when writing the logs failure
> ---
>
> Key: SPARK-35027
> URL: https://issues.apache.org/jira/browse/SPARK-35027
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Jack Hu
>Priority: Major
>
> In Spark Cluster, the ExecutorRunner uses FileAppender  to redirect the 
> stdout/stderr of executors to file, when the writing processing is failure 
> due to some reasons: disk full, the FileAppender will only close the input 
> stream to file, but leave the pipe's stdout/stderr open, following writting 
> operation in executor side may be hung. 
> need to close the inputStream in FileAppender ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure

2021-07-01 Thread Jack Hu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17372425#comment-17372425
 ] 

Jack Hu edited comment on SPARK-35027 at 7/1/21, 6:40 AM:
--

Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in 
"[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59];,
 but the cloure in finally only closes the output stream (to file), [but leave 
the 
"inp|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79]utStream;
 open., which is the pipe's output stream. 


was (Author: jhu):
Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in 
"[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59];,
 but the cloure in finally only closes the output stream (to file), but leave 
the "inputStream" open., which is the pipe's output stream. 

> Close the inputStream in FileAppender when writing the logs failure
> ---
>
> Key: SPARK-35027
> URL: https://issues.apache.org/jira/browse/SPARK-35027
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Jack Hu
>Priority: Major
>
> In Spark Cluster, the ExecutorRunner uses FileAppender  to redirect the 
> stdout/stderr of executors to file, when the writing processing is failure 
> due to some reasons: disk full, the FileAppender will only close the input 
> stream to file, but leave the pipe's stdout/stderr open, following writting 
> operation in executor side may be hung. 
> need to close the inputStream in FileAppender ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure

2021-07-01 Thread Jack Hu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17372425#comment-17372425
 ] 

Jack Hu edited comment on SPARK-35027 at 7/1/21, 6:39 AM:
--

Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in 
"[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59];,
 but the cloure in finally only closes the output stream (to file), but leave 
the "inputStream" open., which is the pipe's output stream. 


was (Author: jhu):
Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in "appendStreamToFile", but the cloure in finally 
only closes the output stream (to file), but leave the "inputStream" open., 
which is the pipe's output stream. 

> Close the inputStream in FileAppender when writing the logs failure
> ---
>
> Key: SPARK-35027
> URL: https://issues.apache.org/jira/browse/SPARK-35027
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Jack Hu
>Priority: Major
>
> In Spark Cluster, the ExecutorRunner uses FileAppender  to redirect the 
> stdout/stderr of executors to file, when the writing processing is failure 
> due to some reasons: disk full, the FileAppender will only close the input 
> stream to file, but leave the pipe's stdout/stderr open, following writting 
> operation in executor side may be hung. 
> need to close the inputStream in FileAppender ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure

2021-07-01 Thread Jack Hu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17372425#comment-17372425
 ] 

Jack Hu commented on SPARK-35027:
-

Of course, the "stop" in FileAppender does nothing but set a flag. 

The exception will be thrown in "appendStreamToFile", but the cloure in finally 
only closes the output stream (to file), but leave the "inputStream" open., 
which is the pipe's output stream. 

> Close the inputStream in FileAppender when writing the logs failure
> ---
>
> Key: SPARK-35027
> URL: https://issues.apache.org/jira/browse/SPARK-35027
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Jack Hu
>Priority: Major
>
> In Spark Cluster, the ExecutorRunner uses FileAppender  to redirect the 
> stdout/stderr of executors to file, when the writing processing is failure 
> due to some reasons: disk full, the FileAppender will only close the input 
> stream to file, but leave the pipe's stdout/stderr open, following writting 
> operation in executor side may be hung. 
> need to close the inputStream in FileAppender ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure

2021-04-12 Thread Jack Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-35027:

Description: 
In Spark Cluster, the ExecutorRunner uses FileAppender  to redirect the 
stdout/stderr of executors to file, when the writing processing is failure due 
to some reasons: disk full, the FileAppender will only close the input stream 
to file, but leave the pipe's stdout/stderr open, following writting operation 
in executor side may be hung. 

need to close the inputStream in FileAppender ?

  was:
The ExecutorRunner uses FileAppender  to redirect the stdout/stderr of 
executors to file, when the writing processing is failure due to some reasons: 
disk full, the FileAppender will only close the input stream to file, but leave 
the pipe's stdout/stderr open, following writting operation in executor side 
may be hung. 

need to close the inputStream in FileAppender ?


> Close the inputStream in FileAppender when writing the logs failure
> ---
>
> Key: SPARK-35027
> URL: https://issues.apache.org/jira/browse/SPARK-35027
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.1
>Reporter: Jack Hu
>Priority: Major
>
> In Spark Cluster, the ExecutorRunner uses FileAppender  to redirect the 
> stdout/stderr of executors to file, when the writing processing is failure 
> due to some reasons: disk full, the FileAppender will only close the input 
> stream to file, but leave the pipe's stdout/stderr open, following writting 
> operation in executor side may be hung. 
> need to close the inputStream in FileAppender ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure

2021-04-12 Thread Jack Hu (Jira)
Jack Hu created SPARK-35027:
---

 Summary: Close the inputStream in FileAppender when writing the 
logs failure
 Key: SPARK-35027
 URL: https://issues.apache.org/jira/browse/SPARK-35027
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.1
Reporter: Jack Hu


The ExecutorRunner uses FileAppender  to redirect the stdout/stderr of 
executors to file, when the writing processing is failure due to some reasons: 
disk full, the FileAppender will only close the input stream to file, but leave 
the pipe's stdout/stderr open, following writting operation in executor side 
may be hung. 

need to close the inputStream in FileAppender ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21480) Memory leak in org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult

2017-07-20 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094351#comment-16094351
 ] 

Jack Hu commented on SPARK-21480:
-

The issue seems resovled in latest HIVE: 
[HIVE-15551|https://issues.apache.org/jira/browse/HIVE-15551]

> Memory leak in 
> org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult
> --
>
> Key: SPARK-21480
> URL: https://issues.apache.org/jira/browse/SPARK-21480
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Jack Hu
>
> There is memory leak in hive with mysql in 
> {{org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult}}, 
> which create sql statements, but not close it.
> Here is the simple one to recreate it:
> {code:java}
> val port = 1
> val stream = ssc.socketTextStream("host", port).map(x => 
> (x,x)).updateStateByKey(
>   (inputs : Seq[String], s : Option[String]) => 
> inputs.lastOption.orElse(s)
>  )
> stream.foreachRDD((rdd, t) => {
>  
> hiveContext.sparkSession.createDataFrame(rdd).write.mode("overwrite").saveAsTable("t")
> }
> {code}
> Here is the hive settings
> {code}
> hive.metastore.warehouse.dir=file:///user/hive/warehouse
> javax.jdo.option.ConnectionURL=jdbc:mysql://ip:3306/hive
> spark.sql.warehouse.dir=file:///user/hive/warehouse
> javax.jdo.option.ConnectionDriveName=com.mysql.jdbc.Driver
> javax.jdo.option.ConnectionUserName=hive
> javax.jdo.option.ConnectionPassword=hive
> hive.exec.dynamic.partition.mode=nonstrict
> {code}
> After execute a while, there are many instances of 
> {{com.mysql.jdbc.JDBC42ResultSet}} and {{com.mysql.jdbc.StatementImpl}} and 
> keep increasing.
> After attache a debugger, we found the statements create in 
> {{org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult}} but 
> not closed
> {code:java}
>  private void executeNoResult(final String queryText) throws SQLException {
> JDOConnection jdoConn = pm.getDataStoreConnection();
> boolean doTrace = LOG.isDebugEnabled();
> try {
>   long start = doTrace ? System.nanoTime() : 0;
>   
> ((Connection)jdoConn.getNativeConnection()).createStatement().execute(queryText);
>   timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
> } finally {
>   jdoConn.close(); // We must release the connection before we call other 
> pm methods.
> }
>   }
> {code}
> The reference call stack is 
> {code:java}
> at com.mysql.jdbc.JDBC42ResultSet.(JDBC42ResultSet.java:44)
>   at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>   at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
>   at com.mysql.jdbc.ResultSetImpl.getInstance(ResultSetImpl.java:319)
>   at com.mysql.jdbc.MysqlIO.buildResultSetWithUpdates(MysqlIO.java:3114)
>   at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3014)
>   at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2280)
>   at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
>   at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2546)
>   at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2504)
>   at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840)
>   at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740)
>   at com.jolbox.bonecp.StatementHandle.execute(StatementHandle.java:254)
>   at 
> org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult(MetaStoreDirectSql.java:233)
>   at 
> org.apache.hadoop.hive.metastore.MetaStoreDirectSql.doDbSpecificInitializationsBeforeQuery(MetaStoreDirectSql.java:222)
>   at 
> org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getDatabase(MetaStoreDirectSql.java:263)
>   at 
> org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:578)
>   at 
> org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:575)
>   at 
> org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2385)
>   at 
> org.apache.hadoop.hive.metastore.ObjectStore.getDatabaseInternal(ObjectStore.java:575)
>   at 
> org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:559)
>   at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114)
>   at 

[jira] [Created] (SPARK-21480) Memory leak in org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult

2017-07-20 Thread Jack Hu (JIRA)
Jack Hu created SPARK-21480:
---

 Summary: Memory leak in 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult
 Key: SPARK-21480
 URL: https://issues.apache.org/jira/browse/SPARK-21480
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.1
Reporter: Jack Hu


There is memory leak in hive with mysql in 
{{org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult}}, which 
create sql statements, but not close it.

Here is the simple one to recreate it:

{code:java}
val port = 1
val stream = ssc.socketTextStream("host", port).map(x => 
(x,x)).updateStateByKey(
  (inputs : Seq[String], s : Option[String]) => inputs.lastOption.orElse(s)
 )
stream.foreachRDD((rdd, t) => {
 
hiveContext.sparkSession.createDataFrame(rdd).write.mode("overwrite").saveAsTable("t")
}
{code}

Here is the hive settings

{code}
hive.metastore.warehouse.dir=file:///user/hive/warehouse
javax.jdo.option.ConnectionURL=jdbc:mysql://ip:3306/hive
spark.sql.warehouse.dir=file:///user/hive/warehouse
javax.jdo.option.ConnectionDriveName=com.mysql.jdbc.Driver
javax.jdo.option.ConnectionUserName=hive
javax.jdo.option.ConnectionPassword=hive
hive.exec.dynamic.partition.mode=nonstrict
{code}

After execute a while, there are many instances of 
{{com.mysql.jdbc.JDBC42ResultSet}} and {{com.mysql.jdbc.StatementImpl}} and 
keep increasing.

After attache a debugger, we found the statements create in 
{{org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult}} but not 
closed

{code:java}
 private void executeNoResult(final String queryText) throws SQLException {
JDOConnection jdoConn = pm.getDataStoreConnection();
boolean doTrace = LOG.isDebugEnabled();
try {
  long start = doTrace ? System.nanoTime() : 0;
  
((Connection)jdoConn.getNativeConnection()).createStatement().execute(queryText);
  timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
} finally {
  jdoConn.close(); // We must release the connection before we call other 
pm methods.
}
  }
{code}

The reference call stack is 


{code:java}
at com.mysql.jdbc.JDBC42ResultSet.(JDBC42ResultSet.java:44)
at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown 
Source)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.ResultSetImpl.getInstance(ResultSetImpl.java:319)
at com.mysql.jdbc.MysqlIO.buildResultSetWithUpdates(MysqlIO.java:3114)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3014)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2280)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2546)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2504)
at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840)
at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740)
at com.jolbox.bonecp.StatementHandle.execute(StatementHandle.java:254)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult(MetaStoreDirectSql.java:233)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.doDbSpecificInitializationsBeforeQuery(MetaStoreDirectSql.java:222)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getDatabase(MetaStoreDirectSql.java:263)
at 
org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:578)
at 
org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:575)
at 
org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2385)
at 
org.apache.hadoop.hive.metastore.ObjectStore.getDatabaseInternal(ObjectStore.java:575)
at 
org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:559)
at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114)
at com.sun.proxy.$Proxy9.getDatabase(Unknown Source)
at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_database_core(HiveMetaStore.java:956)
at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_database(HiveMetaStore.java:930)
at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at 

[jira] [Updated] (SPARK-16413) The JDBC UI does not show the job id when spark.sql.thriftServer.incrementalCollect=true

2016-07-07 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-16413:

Attachment: no_job_id.png

ThriftServer2 does not show the "job id"

> The JDBC UI does not show the job id when 
> spark.sql.thriftServer.incrementalCollect=true
> 
>
> Key: SPARK-16413
> URL: https://issues.apache.org/jira/browse/SPARK-16413
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: Jack Hu
>  Labels: SQL, ThriftServer2, UI
> Attachments: no_job_id.png
>
>
> When set {{spark.sql.thriftServer.incrementalCollect=true}}, the Hive Thrift 
> Server2 UI can not show the "job id" in SQL statistics. 
> To reproduce this, run this
> {code}
> spark-submit --conf spark.sql.thriftServer.incrementalCollect=true --class 
> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
> {code}
> then submit some sql.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16413) The JDBC UI does not show the job id when spark.sql.thriftServer.incrementalCollect=true

2016-07-07 Thread Jack Hu (JIRA)
Jack Hu created SPARK-16413:
---

 Summary: The JDBC UI does not show the job id when 
spark.sql.thriftServer.incrementalCollect=true
 Key: SPARK-16413
 URL: https://issues.apache.org/jira/browse/SPARK-16413
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.2
Reporter: Jack Hu


When set {{spark.sql.thriftServer.incrementalCollect=true}}, the Hive Thrift 
Server2 UI can not show the "job id" in SQL statistics. 

To reproduce this, run this
{code}
spark-submit --conf spark.sql.thriftServer.incrementalCollect=true --class 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
{code}

then submit some sql.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-25 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15116545#comment-15116545
 ] 

Jack Hu commented on SPARK-6847:


Hi [~zsxwing]

Even user does not implicit do checkpoint after the {{upstateByKey}}, this 
issue still will happen in following cases
# {{updateStateByKey().filter().updateStateByKey()}}
# {{updateStateByKey().filter().reduceByKeyAndWindow(reduce, inreduce, ...)}}
# {{reduceByKeyAndWindow(reduce,inreduce,...).filter().udateStateByKey()}}

If do not plan to fix this issue, may be an implicit workaround/warning should 
give to user to such usage. 
It will be very hard to find the real cause if the application is complicate. 


> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-24 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114722#comment-15114722
 ] 

Jack Hu commented on SPARK-6847:


Test on latest 1.6 branch (f913f7e [SPARK-12120][PYSPARK] Improve exception 
message when failing to init), it still exists.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-21 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15111849#comment-15111849
 ] 

Jack Hu commented on SPARK-6847:


Hi [~zsxwing]

I just test a simple case with 1.6, it still exists:
{code}
batch interval = 2 seconds
source.updateStateByKey(func).map(f).checkpoint(2 seconds)
{code}

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-11749) Duplicate creating the RDD in file stream when recovering from checkpoint data

2015-11-15 Thread Jack Hu (JIRA)
Jack Hu created SPARK-11749:
---

 Summary: Duplicate creating the RDD in file stream when recovering 
from checkpoint data
 Key: SPARK-11749
 URL: https://issues.apache.org/jira/browse/SPARK-11749
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.0, 1.5.2
Reporter: Jack Hu


I have a case to monitor a HDFS folder, then enrich the incoming data from the 
HDFS folder via different table (about 15 reference tables) and send to 
different hive table after some operations. 

The code is as this:
{code}
val txt = 
ssc.textFileStream(folder).map(toKeyValuePair).reduceByKey(removeDuplicates)
val refTable1 = 
ssc.textFileStream(refSource1).map(parse(_)).updateStateByKey(...)
txt.join(refTable1).map(..).reduceByKey(...).foreachRDD(
  rdd => {
 // insert into hive table
  }
)

val refTable2 = 
ssc.textFileStream(refSource2).map(parse(_)).updateStateByKey(...)
txt.join(refTable2).map(..).reduceByKey(...).foreachRDD(
  rdd => {
 // insert into hive table
  }
)

/// more refTables in following code
{code}
 
The {{batchInterval}} of this application is set to *30 seconds*, the 
checkpoint interval is set to *10 minutes*, every batch in {{txt}} has *60 
files*

After recovered from checkpoint data, I can see lots of log to create the RDD 
in file stream: rdd in each batch of file stream was been recreated *15 times*, 
and it takes about *5 minutes* to create so much file RDD. During this period, 
*10K+ broadcast* had been created and almost used all the block manager space. 

After some investigation, we found that the {{DStream.restoreCheckpointData}} 
would be invoked at each output ({{DStream.foreachRDD}} in this case), and no 
flag to indicate that this {{DStream}} had been restored, so the RDD in file 
stream was been recreated. 

Suggest to add on flag to control the restore process to avoid the duplicated 
work.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-10-09 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951593#comment-14951593
 ] 

Jack Hu commented on SPARK-6847:


Hi [~glyton.camilleri]
You can check whether there are two dstreams in the DAG need to be checkpointed 
(updateStateByKey, reduceByKeyAndWindow), it yes, you can workaround this to 
use some output for the previous DStream which needs to checkpointed. 

{code}
val d1 = input.updateStateByKey(func)
val d2 = d1.map(...).updateStateByKey(func)
d2.foreachRDD(rdd => print(rdd.count))
/// workaround the stack over flow listed in this JIRA
d1.foreachRDD(rdd => rdd.foreach(_ => Unit))
{code}


> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode

2015-05-14 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-7624:
---
Description: 
I am running a simple spark streaming program with spark 1.3.1 in local mode, 
it receives json string from a socket with rate 50 events per second, it can 
run well in first 6 hours (although the minor gc count per minute is increasing 
all the time), after that, i can see that the scheduler delay in every task is 
significant increased from 10 ms to 100 ms, after 10 hours running, the task 
delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the 
steaming job can not finish in one batch interval (5 seconds). I dumped the 
java memory after 16 hours and can see there are about 20 
{{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
{{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code 
and see only one place may put the {{ReviveOffers}} to akka 
{{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
for (task - tasks) {
  freeCores -= scheduler.CPUS_PER_TASK
  executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
task.name, task.serializedTask)
}

if (tasks.isEmpty  scheduler.activeTaskSets.nonEmpty) {
  // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
  context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
}
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is 
introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked 
smooth after 20 hours running, the scheduler delay is about 10 ms all the time. 
So there should have some conditions that the ReviveOffers will be duplicate 
scheduled? I am not sure why this happens, but i feel that this is the root 
cause of this issue. 

My spark settings:
#  Memor: 3G
# CPU: 8 cores 
# Streaming Batch interval: 5 seconds.  

Here are my streaming code:
{code}
val input = ssc.socketTextStream(
  hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
  /// parse the json to Order
  Order(_), preservePartitioning = true)
val mresult = input.map(
  v = (v.customer, UserSpending(v.customer, v.count * v.price, 
v.timestamp.toLong))).cache()
val tempr  = mresult.window(
Seconds(firstStageWindowSize), 
Seconds(firstStageWindowSize)
  ).transform(
rdd = rdd.union(rdd).union(rdd).union(rdd)
  )
tempr.count.print
tempr.cache().foreachRDD((rdd, t) = {
for (i - 1 to 5) {
  val c = rdd.filter(x=scala.util.Random.nextInt(5) == i).count()
  println(T:  + t + :  + c)
}
  })
{code}


Updated at 2015-05-15
I did print some detail schedule times of the suspect lines in 
{{LocalActor::reviveOffers}}: {color:red}1685343501{color} times after 18 hours 
running.

  was:
I am running a simple spark streaming program with spark 1.3.1 in local mode, 
it receives json string from a socket with rate 50 events per second, it can 
run well in first 6 hours (although the minor gc count per minute is increasing 
all the time), after that, i can see that the scheduler delay in every task is 
significant increased from 10 ms to 100 ms, after 10 hours running, the task 
delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the 
steaming job can not finish in one batch interval (5 seconds). I dumped the 
java memory after 16 hours and can see there are about 20 
{{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
{{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code 
and see only one place may put the {{ReviveOffers}} to akka 
{{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
for (task - tasks) {
  freeCores -= scheduler.CPUS_PER_TASK
  executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
task.name, task.serializedTask)
}

if (tasks.isEmpty  scheduler.activeTaskSets.nonEmpty) {
  // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
  context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
}
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is 
introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked 
smooth after 20 hours running, the scheduler delay is 

[jira] [Updated] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode

2015-05-14 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-7624:
---
Description: 
I am running a simple spark streaming program with spark 1.3.1 in local mode, 
it receives json string from a socket with rate 50 events per second, it can 
run well in first 6 hours (although the minor gc count per minute is increasing 
all the time), after that, i can see that the scheduler delay in every task is 
significant increased from 10 ms to 100 ms, after 10 hours running, the task 
delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the 
steaming job can not finish in one batch interval (5 seconds). I dumped the 
java memory after 16 hours and can see there are about 20 
{{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
{{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code 
and see only one place may put the {{ReviveOffers}} to akka 
{{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
for (task - tasks) {
  freeCores -= scheduler.CPUS_PER_TASK
  executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
task.name, task.serializedTask)
}

if (tasks.isEmpty  scheduler.activeTaskSets.nonEmpty) {
  // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
  context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
}
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is 
introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked 
smooth after 20 hours running, the scheduler delay is about 10 ms all the time. 
So there should have some conditions that the ReviveOffers will be duplicate 
scheduled? I am not sure why this happens, but i feel that this is the root 
cause of this issue. 

My spark settings:
#  Memor: 3G
# CPU: 8 cores 
# Streaming Batch interval: 5 seconds.  

Here are my streaming code:
{code}
val input = ssc.socketTextStream(
  hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
  /// parse the json to Order
  Order(_), preservePartitioning = true)
val mresult = input.map(
  v = (v.customer, UserSpending(v.customer, v.count * v.price, 
v.timestamp.toLong))).cache()
val tempr  = mresult.window(
Seconds(firstStageWindowSize), 
Seconds(firstStageWindowSize)
  ).transform(
rdd = rdd.union(rdd).union(rdd).union(rdd)
  )
tempr.count.print
tempr.cache().foreachRDD((rdd, t) = {
for (i - 1 to 5) {
  val c = rdd.filter(x=scala.util.Random.nextInt(5) == i).count()
  println(T:  + t + :  + c)
}
  })
{code}


Updated at 2015-05-15
I did print some detail schedule times of the suspect lines in 
{{LocalActor::reviveOffers}}: {color:red}*1685343501*{color} times after 18 
hours running.

  was:
I am running a simple spark streaming program with spark 1.3.1 in local mode, 
it receives json string from a socket with rate 50 events per second, it can 
run well in first 6 hours (although the minor gc count per minute is increasing 
all the time), after that, i can see that the scheduler delay in every task is 
significant increased from 10 ms to 100 ms, after 10 hours running, the task 
delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the 
steaming job can not finish in one batch interval (5 seconds). I dumped the 
java memory after 16 hours and can see there are about 20 
{{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
{{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code 
and see only one place may put the {{ReviveOffers}} to akka 
{{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
for (task - tasks) {
  freeCores -= scheduler.CPUS_PER_TASK
  executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
task.name, task.serializedTask)
}

if (tasks.isEmpty  scheduler.activeTaskSets.nonEmpty) {
  // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
  context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
}
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is 
introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked 
smooth after 20 hours running, the scheduler delay 

[jira] [Updated] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode

2015-05-13 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-7624:
---
Description: 
I am running a simple spark streaming program with spark 1.3.1 in local mode, 
it receives json string from a socket with rate 50 events per second, it can 
run well in first 6 hours (although the minor gc count per minute is increasing 
all the time), after that, i can see that the scheduler delay in every task is 
significant increased from 10 ms to 100 ms, after 10 hours running, the task 
delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the 
steaming job can not finish in one batch interval (5 seconds). I dumped the 
java memory after 16 hours and can see there are about 20 
{{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
{{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code 
and see only one place may put the {{ReviveOffers}} to akka 
{{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
for (task - tasks) {
  freeCores -= scheduler.CPUS_PER_TASK
  executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
task.name, task.serializedTask)
}

if (tasks.isEmpty  scheduler.activeTaskSets.nonEmpty) {
  // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
  context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
}
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is 
introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked 
smooth after 20 hours running, the scheduler delay is about 10 ms all the time. 
So there should have some conditions that the ReviveOffers will be duplicate 
scheduled? I am not sure why this happens, but i feel that this is the root 
cause of this issue. 

My spark settings:
#  Memor: 3G
# CPU: 8 cores 
# Streaming Batch interval: 5 seconds.  

Here are my streaming code:
{code}
val input = ssc.socketTextStream(
  hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
  /// parse the json to Order
  Order(_), preservePartitioning = true)
val mresult = input.map(
  v = (v.customer, UserSpending(v.customer, v.count * v.price, 
v.timestamp.toLong))).cache()
val tempr  = mresult.window(
Seconds(firstStageWindowSize), 
Seconds(firstStageWindowSize)
  ).transform(
rdd = rdd.union(rdd).union(rdd).union(rdd)
  )
tempr.count.print
tempr.cache().foreachRDD((rdd, t) = {
for (i - 1 to 5) {
  val c = rdd.filter(x=scala.util.Random.nextInt(5) == i).count()
  println(T:  + t + :  + c)
}
  })
{code}

  was:
I am running a simple spark streaming program with spark 1.3.1 in local mode, 
it receives json string from a socket with rate 50 events per second, it can 
run well in first 6 hours (although the minor gc count per minute is increasing 
all the time), after that, i can see that the scheduler delay in every task is 
significant increased from 10 ms to 100 ms, after 10 hours running, the task 
delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the 
steaming job can not finish in one batch interval (5 seconds). I dumped the 
java memory after 16 hours and can see there are about 20 
{{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
{{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code 
and see only one place may put the {{ReviveOffers}} to akka 
{{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
for (task - tasks) {
  freeCores -= scheduler.CPUS_PER_TASK
  executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
task.name, task.serializedTask)
}

if (tasks.isEmpty  scheduler.activeTaskSets.nonEmpty) {
  // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
  context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is 
introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked 
smooth after 20 hours running, the scheduler delay is about 10 ms all the time. 
So there should have some conditions that the ReviveOffers will be duplicate 
scheduled? I am not sure why this happens, but i feel that this is the root 
cause of this issue. 

My spark settings:
#  Memor: 

[jira] [Created] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode

2015-05-13 Thread Jack Hu (JIRA)
Jack Hu created SPARK-7624:
--

 Summary: Task scheduler delay is increasing time over time in 
spark local mode
 Key: SPARK-7624
 URL: https://issues.apache.org/jira/browse/SPARK-7624
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.3.1
Reporter: Jack Hu


I am running a simple spark streaming program with spark 1.3.1 in local mode, 
it receives json string from a socket with rate 50 events per second, it can 
run well in first 6 hours (although the minor gc count per minute is increasing 
all the time), after that, i can see that the scheduler delay in every task is 
significant increased from 10 ms to 100 ms, after 10 hours running, the task 
delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the 
steaming job can not finish in one batch interval (5 seconds). I dumped the 
java memory after 16 hours and can see there are about 20 
{{org.apache.spark.scheduler.local.ReviveOffers}} objects in 
{{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code 
and see only one place may put the {{ReviveOffers}} to akka 
{{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
{code}
 def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
for (task - tasks) {
  freeCores -= scheduler.CPUS_PER_TASK
  executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
task.name, task.serializedTask)
}

if (tasks.isEmpty  scheduler.activeTaskSets.nonEmpty) {
  // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
  context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
}
{code}

I removed the last three lines in this method (the whole {{if}} block, which is 
introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked 
smooth after 20 hours running, the scheduler delay is about 10 ms all the time. 
So there should have some conditions that the ReviveOffers will be duplicate 
scheduled? I am not sure why this happens, but i feel that this is the root 
cause of this issue. 

My spark settings:
#  Memor: 3G
# CPU: 8 cores 
# Streaming Batch interval: 5 seconds.  

Here are my streaming code:
{code}
val input = ssc.socketTextStream(
  hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
  /// parse the json to Order
  Order(_), preservePartitioning = true)
val mresult = input.map(
  v = (v.customer, UserSpending(v.customer, v.count * v.price, 
v.timestamp.toLong))).cache()
val tempr  = mresult.window(
Seconds(firstStageWindowSize), 
Seconds(firstStageWindowSize)
  ).transform(
rdd = rdd.union(rdd).union(rdd).union(rdd)
  )
tempr.count.print
tempr.cache().foreachRDD((rdd, t) = {
for (i - 1 to 5) {
  val c = rdd.filter(x=scala.util.Random.nextInt(5) == i).count()
  println(T:  + t + :  + c)
}
  })
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7183) Memory leak in netty shuffle with spark standalone cluster

2015-05-07 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14532108#comment-14532108
 ] 

Jack Hu commented on SPARK-7183:


Hi, [~sowen]

Do we plan to add this to 1.3+? If there is any plan to release more minor 
release for 1.3+ like 1.3.2. 


 Memory leak in netty shuffle with spark standalone cluster
 --

 Key: SPARK-7183
 URL: https://issues.apache.org/jira/browse/SPARK-7183
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.3.0
Reporter: Jack Hu
Assignee: Liang-Chi Hsieh
  Labels: memory-leak, netty, shuffle
 Fix For: 1.4.0


 There is slow leak in netty shuffle with spark cluster in 
 {{TransportRequestHandler.streamIds}}
 In spark cluster, there are some reusable netty connections between two block 
 managers to get/send blocks between worker/drivers. These connections are 
 handled by the {{org.apache.spark.network.server.TransportRequestHandler}} in 
 server side. This handler keep tracking all the streamids negotiate by RPC 
 when shuffle data need transform in these two block managers and the streamid 
 is keeping increasing, and never get a chance to be deleted exception this 
 connection is dropped (seems never happen in normal running).
 Here are some detail logs of this  {{TransportRequestHandler}} (Note: we add 
 a log a print the total size of {{TransportRequestHandler.streamIds}}, the 
 log is Current set size is N of 
 org.apache.spark.network.server.TransportRequestHandler@ADDRESS, this set 
 size is keeping increasing in our test)
 {quote}
 15/04/22 21:00:16 DEBUG TransportServer: Shuffle server started on port :46288
 15/04/22 21:00:16 INFO NettyBlockTransferService: Server created on 46288
 15/04/22 21:00:31 INFO TransportRequestHandler: Created 
 TransportRequestHandler 
 org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
 15/04/22 21:00:32 TRACE MessageDecoder: Received message RpcRequest: 
 RpcRequest\{requestId=6655045571437304938, message=[B@59778678\}
 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Received request: 
 OpenBlocks\{appId=app-20150422210016-, execId=driver, 
 blockIds=[broadcast_1_piece0]}
 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Registered streamId 
 1387459488000 with 1 buffers
 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result 
 RpcResponse\{requestId=6655045571437304938, response=[B@d2840b\} to client 
 /10.111.7.150:33802
 15/04/22 21:00:33 TRACE MessageDecoder: Received message ChunkFetchRequest: 
 ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488000, 
 chunkIndex=0}}
 15/04/22 21:00:33 TRACE TransportRequestHandler: Received req from 
 /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488000, 
 chunkIndex=0\}
 15/04/22 21:00:33 INFO TransportRequestHandler: Current set size is 1 of 
 org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
 15/04/22 21:00:33 TRACE OneForOneStreamManager: Removing stream id 
 1387459488000
 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result 
 ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488000, 
 chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 
 lim=3839 cap=3839]}} to client /10.111.7.150:33802
 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: 
 RpcRequest\{requestId=6660601528868866371, message=[B@42bed1b8\}
 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: 
 OpenBlocks\{appId=app-20150422210016-, execId=driver, 
 blockIds=[broadcast_3_piece0]}
 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Registered streamId 
 1387459488001 with 1 buffers
 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result 
 RpcResponse\{requestId=6660601528868866371, response=[B@7fa3fb60\} to client 
 /10.111.7.150:33802
 15/04/22 21:00:34 TRACE MessageDecoder: Received message ChunkFetchRequest: 
 ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488001, 
 chunkIndex=0}}
 15/04/22 21:00:34 TRACE TransportRequestHandler: Received req from 
 /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488001, 
 chunkIndex=0\}
 15/04/22 21:00:34 INFO TransportRequestHandler: Current set size is 2 of 
 org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
 15/04/22 21:00:34 TRACE OneForOneStreamManager: Removing stream id 
 1387459488001
 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result 
 ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488001, 
 chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 
 lim=4277 cap=4277]}} to client /10.111.7.150:33802
 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: 
 RpcRequest\{requestId=8454597410163901330, message=[B@19c673d1\}
 15/04/22 

[jira] [Updated] (SPARK-7183) Memory leak in netty shuffle with spark standalone cluster

2015-04-27 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-7183:
---
Summary: Memory leak in netty shuffle with spark standalone cluster  (was: 
Memory leak with netty shuffle with spark standalone cluster)

 Memory leak in netty shuffle with spark standalone cluster
 --

 Key: SPARK-7183
 URL: https://issues.apache.org/jira/browse/SPARK-7183
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: memory-leak, netty, shuffle

 There is slow leak in netty shuffle with spark cluster in 
 {{TransportRequestHandler.streamIds}}
 In spark cluster, there are some reusable netty connections between two block 
 managers to get/send blocks between worker/drivers. These connections are 
 handled by the {{org.apache.spark.network.server.TransportRequestHandler}} in 
 server side. This handler keep tracking all the streamids negotiate by RPC 
 when shuffle data need transform in these two block managers and the streamid 
 is keeping increasing, and never get a chance to be deleted exception this 
 connection is dropped (seems never happen in normal running).
 Here are some detail logs of this  {{TransportRequestHandler}} (Note: we add 
 a log a print the total size of {{TransportRequestHandler.streamIds}}, the 
 log is Current set size is N of 
 org.apache.spark.network.server.TransportRequestHandler@ADDRESS, this set 
 size is keeping increasing in our test)
 {quote}
 15/04/22 21:00:16 DEBUG TransportServer: Shuffle server started on port :46288
 15/04/22 21:00:16 INFO NettyBlockTransferService: Server created on 46288
 15/04/22 21:00:31 INFO TransportRequestHandler: Created 
 TransportRequestHandler 
 org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
 15/04/22 21:00:32 TRACE MessageDecoder: Received message RpcRequest: 
 RpcRequest\{requestId=6655045571437304938, message=[B@59778678\}
 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Received request: 
 OpenBlocks\{appId=app-20150422210016-, execId=driver, 
 blockIds=[broadcast_1_piece0]}
 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Registered streamId 
 1387459488000 with 1 buffers
 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result 
 RpcResponse\{requestId=6655045571437304938, response=[B@d2840b\} to client 
 /10.111.7.150:33802
 15/04/22 21:00:33 TRACE MessageDecoder: Received message ChunkFetchRequest: 
 ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488000, 
 chunkIndex=0}}
 15/04/22 21:00:33 TRACE TransportRequestHandler: Received req from 
 /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488000, 
 chunkIndex=0\}
 15/04/22 21:00:33 INFO TransportRequestHandler: Current set size is 1 of 
 org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
 15/04/22 21:00:33 TRACE OneForOneStreamManager: Removing stream id 
 1387459488000
 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result 
 ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488000, 
 chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 
 lim=3839 cap=3839]}} to client /10.111.7.150:33802
 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: 
 RpcRequest\{requestId=6660601528868866371, message=[B@42bed1b8\}
 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: 
 OpenBlocks\{appId=app-20150422210016-, execId=driver, 
 blockIds=[broadcast_3_piece0]}
 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Registered streamId 
 1387459488001 with 1 buffers
 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result 
 RpcResponse\{requestId=6660601528868866371, response=[B@7fa3fb60\} to client 
 /10.111.7.150:33802
 15/04/22 21:00:34 TRACE MessageDecoder: Received message ChunkFetchRequest: 
 ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488001, 
 chunkIndex=0}}
 15/04/22 21:00:34 TRACE TransportRequestHandler: Received req from 
 /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488001, 
 chunkIndex=0\}
 15/04/22 21:00:34 INFO TransportRequestHandler: Current set size is 2 of 
 org.apache.spark.network.server.TransportRequestHandler@29a4f3e7
 15/04/22 21:00:34 TRACE OneForOneStreamManager: Removing stream id 
 1387459488001
 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result 
 ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488001, 
 chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 
 lim=4277 cap=4277]}} to client /10.111.7.150:33802
 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: 
 RpcRequest\{requestId=8454597410163901330, message=[B@19c673d1\}
 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: 
 OpenBlocks\{appId=app-20150422210016-, 

[jira] [Commented] (SPARK-6844) Memory leak occurs when register temp table with cache table on

2015-04-23 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14508597#comment-14508597
 ] 

Jack Hu commented on SPARK-6844:


Hi, [~marmbrus]]
I mean 1.3.X, like 1.3.2. The master seems not much different with branch 1.3 
(May be i am wrong)

 Memory leak occurs when register temp table with cache table on
 ---

 Key: SPARK-6844
 URL: https://issues.apache.org/jira/browse/SPARK-6844
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: Memory, SQL
 Fix For: 1.4.0


 There is a memory leak in register temp table with cache on
 This is the simple code to reproduce this issue:
 {code}
 val sparkConf = new SparkConf().setAppName(LeakTest)
 val sparkContext = new SparkContext(sparkConf)
 val sqlContext = new SQLContext(sparkContext)
 val tableName = tmp
 val jsonrdd = sparkContext.textFile(sample.json)
 var loopCount = 1L
 while(true) {
   sqlContext.jsonRDD(jsonrdd).registerTempTable(tableName)
   sqlContext.cacheTable(tableName)
   println(L:  +loopCount +  R: + sqlContext.sql(select count(*) 
 from tmp).count())
   sqlContext.uncacheTable(tableName)
   loopCount += 1
 }
 {code}
 The cause is that the {{InMemoryRelation}}. {{InMemoryColumnarTableScan}} 
 uses the accumulator 
 ({{InMemoryRelation.batchStats}},{{InMemoryColumnarTableScan.readPartitions}},
  {{InMemoryColumnarTableScan.readBatches}} ) to get some information from 
 partitions or for test. These accumulators will register itself into a static 
 map in {{Accumulators.originals}} and never get cleaned up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6844) Memory leak occurs when register temp table with cache table on

2015-04-16 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497627#comment-14497627
 ] 

Jack Hu commented on SPARK-6844:


Hi, [~marmbrus]

Do we have a plan to port this to 1.3.X branch? 


 Memory leak occurs when register temp table with cache table on
 ---

 Key: SPARK-6844
 URL: https://issues.apache.org/jira/browse/SPARK-6844
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: Memory, SQL
 Fix For: 1.4.0


 There is a memory leak in register temp table with cache on
 This is the simple code to reproduce this issue:
 {code}
 val sparkConf = new SparkConf().setAppName(LeakTest)
 val sparkContext = new SparkContext(sparkConf)
 val sqlContext = new SQLContext(sparkContext)
 val tableName = tmp
 val jsonrdd = sparkContext.textFile(sample.json)
 var loopCount = 1L
 while(true) {
   sqlContext.jsonRDD(jsonrdd).registerTempTable(tableName)
   sqlContext.cacheTable(tableName)
   println(L:  +loopCount +  R: + sqlContext.sql(select count(*) 
 from tmp).count())
   sqlContext.uncacheTable(tableName)
   loopCount += 1
 }
 {code}
 The cause is that the {{InMemoryRelation}}. {{InMemoryColumnarTableScan}} 
 uses the accumulator 
 ({{InMemoryRelation.batchStats}},{{InMemoryColumnarTableScan.readPartitions}},
  {{InMemoryColumnarTableScan.readBatches}} ) to get some information from 
 partitions or for test. These accumulators will register itself into a static 
 map in {{Accumulators.originals}} and never get cleaned up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-15 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495914#comment-14495914
 ] 

Jack Hu commented on SPARK-6847:


I did a little more investigation about this issue, that appears to be a 
problem with some operations({{updateStateByKey}}, {{reduceByKeyAndWindow}} 
with in-reduce function) which must be check-pointed and followed by a 
operation with checkpoint (either manual added like the code of this JIRA 
description or an operation which must be check-pointed) and the checkpoint 
interval of these two operation is the same (or the followed operation has a 
checkpoint interval the same with batch interval).
The following code will have this issue: assume default batch interval is 2 
seconds, the default checkpoint interval is 10 seconds
# {{source.updateStateByKey(func).map(f).checkpoint(10 seconds)}} 
# {{source.updateStateByKey(func).map(f).updateStateByKey(func2)}}
# {{source.updateStateByKey(func).map(f).checkpoint(2 seconds)}} 

These DO NOT have this issue
# {{source.updateStateByKey(func).map(f).checkpoint(4 seconds)}} 
# {{source.updateStateByKey(func).map(f).updateStateByKey(func2).checkpoint(4 
seconds)}}

A rdd graph which contains two rdds needs to be check-pointed would be 
generated from these sample codes. 

If the child(ren) rdd(s) also need to do the checkpoint at the same time the 
parent needs to do, then the parent will not do checkpoint according the 
{{rdd.doCheckpoint}}. In this case, the rdd comes from {{updateStateByKey}} 
will never be check-pointed at the issued sample code, that leads the stack 
overflow. ({{updateStateByKey}} needs checkpoint to break the dependency in 
this operation) 

If the child(ren) rdd(s) is not always check-pointed at the same time of the 
parent needs to do, there is a chance that the parent rdd (comes from 
{{updateStateByKey}}) can do some successful checkpoint to break the 
dependency, although the checkpoint may have some delay. So no stack overflow 
will happen.

So, currently, we got a workaround of this issue by setting the checkpoint 
interval to different values if we use operations that must be check-pointed in 
streaming project. Maybe this is not a easy fix here, hope we can add some 
validation at least

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-14 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493804#comment-14493804
 ] 

Jack Hu commented on SPARK-6847:


Hi, [~sowen]

The checkpoint interval can not be turn down (smaller than 10 seconds) since it 
must be bigger or equal than the batch interval. I will try to more checkpoint 
interval like 20 seconds, 30 seconds...

We have a real case that has the same problem, it only updates small set of 
values per key per interval (one event per key per interval)

One observation is that: the {{updateStateByKey}} is automatically checkpointed

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-13 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493517#comment-14493517
 ] 

Jack Hu commented on SPARK-6847:


Here is the part of the stack (Full stack at: 
https://gist.github.com/jhu-chang/38a6c052aff1d666b785)
{quote}
15/04/14 11:28:20 [Executor task launch worker-1] ERROR 
org.apache.spark.executor.Executor: Exception in task 1.0 in stage 27554.0 (TID 
3801)
java.lang.StackOverflowError
at 
java.io.ObjectStreamClass.setPrimFieldValues(ObjectStreamClass.java:1243)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1984)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
{quote}

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint 

[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-12 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14491873#comment-14491873
 ] 

Jack Hu commented on SPARK-6847:


Hi, [~sowen]

I tested more cases:
# only change the {{newlist.headOption.orElse(oldstate)}} to {{Some(a)}}, the 
issue still exists
# only change the streaming batch interval to {{2 seconds}}, keep the  
{{newlist.headOption.orElse(oldstate)}} and checkpoint interval 10 seconds, the 
issue does not exist. 

So this issue may related to the checkpoint interval and batch interval. 

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-12 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14491873#comment-14491873
 ] 

Jack Hu edited comment on SPARK-6847 at 4/13/15 3:34 AM:
-

Hi, [~sowen]

I tested more cases:
# only change the {{newlist.headOption.orElse(oldstate)}} to {{Some(a)}}, the 
issue still exists
# only change the streaming batch interval to {{2 seconds}}, keep the  
{{newlist.headOption.orElse(oldstate)}} and checkpoint interval 10 seconds, the 
issue does not exist. 

So this issue may be related to the checkpoint interval and batch interval. 


was (Author: jhu):
Hi, [~sowen]

I tested more cases:
# only change the {{newlist.headOption.orElse(oldstate)}} to {{Some(a)}}, the 
issue still exists
# only change the streaming batch interval to {{2 seconds}}, keep the  
{{newlist.headOption.orElse(oldstate)}} and checkpoint interval 10 seconds, the 
issue does not exist. 

So this issue may related to the checkpoint interval and batch interval. 

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists

2015-03-22 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14375336#comment-14375336
 ] 

Jack Hu commented on SPARK-6180:


This issue has already fixed at: https://github.com/apache/spark/pull/4365

 Error logged into log4j when use the HiveMetastoreCatalog::tableExists
 --

 Key: SPARK-6180
 URL: https://issues.apache.org/jira/browse/SPARK-6180
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 1.2.1
Reporter: Jack Hu
Priority: Minor
  Labels: Hive, HiveMetastoreCatalog, spark, starter

 When using {{HiveMetastoreCatalog.tableExists}} to check a table that does 
 not exist in hive store, there is one error message like this logged into log 
 file, the function returns {{false}} as desired. 
 To avoid this error log, one way is to use {{Hive.getTable(databaseName, 
 tblName, false)}} instead of {{Hive.getTable(databaseName, tblName)}}
 {quote}
 15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: 
 NoSuchObjectException(message:default.demotable table not found)
   at 
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560)
   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
   at com.sun.proxy.$Proxy15.get_table(Unknown Source)
   at 
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997)
   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
   at com.sun.proxy.$Proxy16.getTable(Unknown Source)
   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976)
   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
   at 
 org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
   at 
 org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
   at scala.Option.getOrElse(Option.scala:120)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
   at 
 org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
   at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
   at 
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138)
   at 
 

[jira] [Closed] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists

2015-03-22 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu closed SPARK-6180.
--

Fixed

 Error logged into log4j when use the HiveMetastoreCatalog::tableExists
 --

 Key: SPARK-6180
 URL: https://issues.apache.org/jira/browse/SPARK-6180
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 1.2.1
Reporter: Jack Hu
Priority: Minor
  Labels: Hive, HiveMetastoreCatalog, spark, starter
 Fix For: 1.3.1


 When using {{HiveMetastoreCatalog.tableExists}} to check a table that does 
 not exist in hive store, there is one error message like this logged into log 
 file, the function returns {{false}} as desired. 
 To avoid this error log, one way is to use {{Hive.getTable(databaseName, 
 tblName, false)}} instead of {{Hive.getTable(databaseName, tblName)}}
 {quote}
 15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: 
 NoSuchObjectException(message:default.demotable table not found)
   at 
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560)
   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
   at com.sun.proxy.$Proxy15.get_table(Unknown Source)
   at 
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997)
   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
   at com.sun.proxy.$Proxy16.getTable(Unknown Source)
   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976)
   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
   at 
 org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
   at 
 org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
   at scala.Option.getOrElse(Option.scala:120)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
   at 
 org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
   at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
   at 
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137)
   at 
 

[jira] [Resolved] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists

2015-03-22 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu resolved SPARK-6180.

   Resolution: Fixed
Fix Version/s: 1.3.1

Fixed in this pull https://github.com/apache/spark/pull/4365

 Error logged into log4j when use the HiveMetastoreCatalog::tableExists
 --

 Key: SPARK-6180
 URL: https://issues.apache.org/jira/browse/SPARK-6180
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 1.2.1
Reporter: Jack Hu
Priority: Minor
  Labels: Hive, HiveMetastoreCatalog, spark, starter
 Fix For: 1.3.1


 When using {{HiveMetastoreCatalog.tableExists}} to check a table that does 
 not exist in hive store, there is one error message like this logged into log 
 file, the function returns {{false}} as desired. 
 To avoid this error log, one way is to use {{Hive.getTable(databaseName, 
 tblName, false)}} instead of {{Hive.getTable(databaseName, tblName)}}
 {quote}
 15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: 
 NoSuchObjectException(message:default.demotable table not found)
   at 
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560)
   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
   at com.sun.proxy.$Proxy15.get_table(Unknown Source)
   at 
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997)
   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
   at com.sun.proxy.$Proxy16.getTable(Unknown Source)
   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976)
   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
   at 
 org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
   at 
 org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
   at scala.Option.getOrElse(Option.scala:120)
   at 
 org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
   at 
 org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
   at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
   at 
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
   at 
 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
   at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138)
   at 
 

[jira] [Closed] (SPARK-3275) Socket receiver can not recover when the socket server restarted

2015-03-05 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu closed SPARK-3275.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

Checked in 1.1.0, do not find similar issue. 

 Socket receiver can not recover when the socket server restarted 
 -

 Key: SPARK-3275
 URL: https://issues.apache.org/jira/browse/SPARK-3275
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.0.2
Reporter: Jack Hu
  Labels: failover
 Fix For: 1.1.0


 To reproduce this issue:
 1. create a application with a socket dstream
 2. start the socket server and start the application
 3. restart the socket server
 4. the socket dstream will fail to reconnect (it will close the connection 
 after a successful connect)
 The main issue should be the status in SocketReceiver and ReceiverSupervisor 
 is incorrect after the reconnect:
 In SocketReceiver ::receive() the while loop will never be entered after 
 reconnect since the isStopped will returns true:
  val iterator = bytesToObjects(socket.getInputStream())
   while(!isStopped  iterator.hasNext) {
 store(iterator.next)
   }
   logInfo(Stopped receiving)
   restart(Retrying connecting to  + host + : + port)
 That is caused by the status flag receiverState in ReceiverSupervisor will 
 be set to Stopped when the connection losses, but it is reset after the call 
 of Receiver start method:
 def startReceiver(): Unit = synchronized {
 try {
   logInfo(Starting receiver)
   receiver.onStart()
   logInfo(Called receiver onStart)
   onReceiverStart()
   receiverState = Started
 } catch {
   case t: Throwable =
 stop(Error starting receiver  + streamId, Some(t))
 }
   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6061) File source dstream can not include the old file which timestamp is before the system time

2015-03-04 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14348093#comment-14348093
 ] 

Jack Hu commented on SPARK-6061:


[~srowen] 
The issue is: I want to process the old files in file dstream, but the old 
files will be ignored when set the {{newFilesOnly}} to {{false}} 

 File source dstream can not include the old file which timestamp is before 
 the system time
 --

 Key: SPARK-6061
 URL: https://issues.apache.org/jira/browse/SPARK-6061
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.2.1
Reporter: Jack Hu
  Labels: FileSourceDStream, OlderFiles, Streaming
   Original Estimate: 1m
  Remaining Estimate: 1m

 The file source dstream (StreamContext.fileStream) has a properties named 
 newFilesOnly to include the old files, it worked fine with 1.1.0, and 
 broken at 1.2.1, the older files always be ignored no mattern what value is 
 set.  
 Here is the simple reproduce code:
 https://gist.github.com/jhu-chang/1ee5b0788c7479414eeb
 The reason is that: the modTimeIgnoreThreshold in 
 FileInputDStream::findNewFiles is set to a time closed to system time (Spark 
 Streaming Clock time), so the files old than this time are ignored. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6061) File source dstream can not include the old file which timestamp is before the system time

2015-03-04 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14348230#comment-14348230
 ] 

Jack Hu commented on SPARK-6061:


[~tianyi]

Do you know why the {{FileInputDStream.MIN_REMEMBER_DURATION}} is introduced in 
1.2.1 (Actually, it was introduced 1.1.1/1.2.0)? 

 File source dstream can not include the old file which timestamp is before 
 the system time
 --

 Key: SPARK-6061
 URL: https://issues.apache.org/jira/browse/SPARK-6061
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.2.1
Reporter: Jack Hu
  Labels: FileSourceDStream, OlderFiles, Streaming
   Original Estimate: 1m
  Remaining Estimate: 1m

 The file source dstream (StreamContext.fileStream) has a properties named 
 newFilesOnly to include the old files, it worked fine with 1.1.0, and 
 broken at 1.2.1, the older files always be ignored no mattern what value is 
 set.  
 Here is the simple reproduce code:
 https://gist.github.com/jhu-chang/1ee5b0788c7479414eeb
 The reason is that: the modTimeIgnoreThreshold in 
 FileInputDStream::findNewFiles is set to a time closed to system time (Spark 
 Streaming Clock time), so the files old than this time are ignored. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists

2015-03-04 Thread Jack Hu (JIRA)
Jack Hu created SPARK-6180:
--

 Summary: Error logged into log4j when use the 
HiveMetastoreCatalog::tableExists
 Key: SPARK-6180
 URL: https://issues.apache.org/jira/browse/SPARK-6180
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 1.2.1
Reporter: Jack Hu
Priority: Minor


When using {{HiveMetastoreCatalog.tableExists}} to check a table that does not 
exist in hive store, there is one error message like this logged into log file, 
the function returns {{false}} as desired. 

Could we avoid to print error?  (Although it can be disabled in log4j by 
configuration)

{quote}
15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: 
NoSuchObjectException(message:default.demotable table not found)
at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
at com.sun.proxy.$Proxy15.get_table(Unknown Source)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997)

at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at com.sun.proxy.$Proxy16.getTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
at 
org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
at 
org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at 

[jira] [Updated] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists

2015-03-04 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-6180:
---
Description: 
When using {{HiveMetastoreCatalog.tableExists}} to check a table that does not 
exist in hive store, there is one error message like this logged into log file, 
the function returns {{false}} as desired. 

To avoid this error log, one way is to use {{Hive.getTable(databaseName, 
tblName, false)}} instead of {{Hive.getTable(databaseName, tblName)}}

{quote}
15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: 
NoSuchObjectException(message:default.demotable table not found)
at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
at com.sun.proxy.$Proxy15.get_table(Unknown Source)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997)

at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at com.sun.proxy.$Proxy16.getTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
at 
org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
at 
org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at 

[jira] [Created] (SPARK-6061) File source dstream can not include the old file which timestamp is before the system time

2015-02-27 Thread Jack Hu (JIRA)
Jack Hu created SPARK-6061:
--

 Summary: File source dstream can not include the old file which 
timestamp is before the system time
 Key: SPARK-6061
 URL: https://issues.apache.org/jira/browse/SPARK-6061
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.2.1
Reporter: Jack Hu


The file source dstream (StreamContext.fileStream) has a properties named 
newFilesOnly to include the old files, it worked fine with 1.1.0, and broken 
at 1.2.1, the older files always be ignored no mattern what value is set.  

Here is the simple reproduce code:
https://gist.github.com/jhu-chang/1ee5b0788c7479414eeb

The reason is that: the modTimeIgnoreThreshold in 
FileInputDStream::findNewFiles is set to a time closed to system time (Spark 
Streaming Clock time), so the files old than this time are ignored. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3276) Provide a API to specify whether the old files need to be ignored in file input text DStream

2015-01-20 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14283662#comment-14283662
 ] 

Jack Hu commented on SPARK-3276:


With some cases, the old files (older than current spark system time) are
needed: if you have a fixed list in hdfs you want to correlate to the input
stream, then you need to load it from the file system.

As the newFilesOnly options, it breaks on spark 1.2 (It works on 1.1).




 Provide a API to specify whether the old files need to be ignored in file 
 input text DStream
 

 Key: SPARK-3276
 URL: https://issues.apache.org/jira/browse/SPARK-3276
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.2.0
Reporter: Jack Hu
Priority: Minor

 Currently, only one API called textFileStream in StreamingContext to specify 
 the text file dstream, which ignores the old files always. On some times, the 
 old files is still useful.
 Need a API to let user choose whether the old files need to be ingored or not 
 .
 The API currently in StreamingContext:
 def textFileStream(directory: String): DStream[String] = {
 fileStream[LongWritable, Text, 
 TextInputFormat](directory).map(_._2.toString)
   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3274) Spark Streaming Java API reports java.lang.ClassCastException when calling collectAsMap on JavaPairDStream

2014-08-27 Thread Jack Hu (JIRA)
Jack Hu created SPARK-3274:
--

 Summary: Spark Streaming Java API reports 
java.lang.ClassCastException when calling collectAsMap on JavaPairDStream
 Key: SPARK-3274
 URL: https://issues.apache.org/jira/browse/SPARK-3274
 Project: Spark
  Issue Type: Bug
  Components: Java API
Affects Versions: 1.0.2
Reporter: Jack Hu


Reproduce code:

scontext
.socketTextStream(localhost, 1)
.mapToPair(new PairFunctionString, String, String(){
public Tuple2String, String call(String arg0)
throws Exception {
// TODO Auto-generated method stub
return new Tuple2String, String(1, arg0);
}
})
.foreachRDD(new Function2JavaPairRDDString, String, Time, 
Void() {
public Void call(JavaPairRDDString, String v1, Time 
v2) throws Exception {
System.out.println(v2.toString() + :  + 
v1.collectAsMap().toString());
return null;
}
});

Exception:
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tupl
e2;
at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.s
cala:447)
at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:
464)
at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:90)
at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:88)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR
DD$2.apply(JavaDStreamLike.scala:282)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR
DD$2.apply(JavaDStreamLike.scala:282)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mc
V$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo
rEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo
rEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3274) Spark Streaming Java API reports java.lang.ClassCastException when calling collectAsMap on JavaPairDStream

2014-08-27 Thread Jack Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Hu updated SPARK-3274:
---

Description: 
Reproduce code:

scontext
.socketTextStream(localhost, 1)
.mapToPair(new PairFunctionString, String, String(){
public Tuple2String, String call(String arg0)
throws Exception {
return new Tuple2String, String(1, arg0);
}
})
.foreachRDD(new Function2JavaPairRDDString, String, Time, 
Void() {
public Void call(JavaPairRDDString, String v1, Time 
v2) throws Exception {
System.out.println(v2.toString() + :  + 
v1.collectAsMap().toString());
return null;
}
});

Exception:
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tupl
e2;
at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.s
cala:447)
at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:
464)
at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:90)
at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:88)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR
DD$2.apply(JavaDStreamLike.scala:282)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR
DD$2.apply(JavaDStreamLike.scala:282)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mc
V$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo
rEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo
rEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS

  was:
Reproduce code:

scontext
.socketTextStream(localhost, 1)
.mapToPair(new PairFunctionString, String, String(){
public Tuple2String, String call(String arg0)
throws Exception {
// TODO Auto-generated method stub
return new Tuple2String, String(1, arg0);
}
})
.foreachRDD(new Function2JavaPairRDDString, String, Time, 
Void() {
public Void call(JavaPairRDDString, String v1, Time 
v2) throws Exception {
System.out.println(v2.toString() + :  + 
v1.collectAsMap().toString());
return null;
}
});

Exception:
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tupl
e2;
at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.s
cala:447)
at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:
464)
at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:90)
at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:88)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR
DD$2.apply(JavaDStreamLike.scala:282)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR
DD$2.apply(JavaDStreamLike.scala:282)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mc
V$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo
rEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo
rEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS


 Spark Streaming Java API reports java.lang.ClassCastException when calling 
 collectAsMap on JavaPairDStream
 --

 Key: SPARK-3274
 URL: https://issues.apache.org/jira/browse/SPARK-3274
 Project: Spark
  Issue Type: Bug
  Components: Java API
Affects Versions: 1.0.2
Reporter: Jack Hu

 Reproduce code:
 scontext
   .socketTextStream(localhost, 1)
   .mapToPair(new PairFunctionString, String, String(){
   public Tuple2String, String call(String arg0)
   throws Exception {
   return new Tuple2String, String(1, arg0);
  

[jira] [Created] (SPARK-3275) Socket receiver can not recover when the socket server restarted

2014-08-27 Thread Jack Hu (JIRA)
Jack Hu created SPARK-3275:
--

 Summary: Socket receiver can not recover when the socket server 
restarted 
 Key: SPARK-3275
 URL: https://issues.apache.org/jira/browse/SPARK-3275
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.0.2
Reporter: Jack Hu


To reproduce this issue:
1. create a application with a socket dstream
2. start the socket server and start the application
3. restart the socket server
4. the socket dstream will fail to reconnect (it will close the connection 
after a successful connect)

The main issue should be the status in SocketReceiver and ReceiverSupervisor is 
incorrect after the reconnect:
In SocketReceiver ::receive() the while loop will never be entered after 
reconnect since the isStopped will returns true:
 val iterator = bytesToObjects(socket.getInputStream())
  while(!isStopped  iterator.hasNext) {
store(iterator.next)
  }
  logInfo(Stopped receiving)
  restart(Retrying connecting to  + host + : + port)

That is caused by the status flag receiverState in ReceiverSupervisor will be 
set to Stopped when the connection losses, but it is reset after the call of 
Receiver start method:

def startReceiver(): Unit = synchronized {
try {
  logInfo(Starting receiver)
  receiver.onStart()
  logInfo(Called receiver onStart)
  onReceiverStart()
  receiverState = Started
} catch {
  case t: Throwable =
stop(Error starting receiver  + streamId, Some(t))
}
  }



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3276) Provide a API to specify whether the old files need to be ignored in file input text DStream

2014-08-27 Thread Jack Hu (JIRA)
Jack Hu created SPARK-3276:
--

 Summary: Provide a API to specify whether the old files need to be 
ignored in file input text DStream
 Key: SPARK-3276
 URL: https://issues.apache.org/jira/browse/SPARK-3276
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.0.2
Reporter: Jack Hu


Currently, only one API called textFileStream in StreamingContext to specify 
the text file dstream, which ignores the old files always. On some times, the 
old files is still useful.
Need a API to let user choose whether the old files need to be ingored or not .

The API currently in StreamingContext:
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, 
TextInputFormat](directory).map(_._2.toString)
  }



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org