[jira] [Created] (SPARK-39531) Show "Name" in Spark master UI for drivers
Jack Hu created SPARK-39531: --- Summary: Show "Name" in Spark master UI for drivers Key: SPARK-39531 URL: https://issues.apache.org/jira/browse/SPARK-39531 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.2.1 Reporter: Jack Hu For the applications in Spark master in standalone cluster, the "Name" (spark.app.name) shows for running and finished applications, but for driver, Spark master UI only shows main class name, it's hard to map the class name to application name when there are lots of applications and drivers (in some cases, the class name would be same ,but the application name would be difference in a shared cluster), could you show "Name" (spark.app.name) also for Drivers in Spark master UI? -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36672) Using value from map in grouping sets result org.apache.spark.sql.AnalysisException
Jack Hu created SPARK-36672: --- Summary: Using value from map in grouping sets result org.apache.spark.sql.AnalysisException Key: SPARK-36672 URL: https://issues.apache.org/jira/browse/SPARK-36672 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.0 Reporter: Jack Hu Steps to reproduce: # create a table with map {code:java} create table test (int_value INT, dims MAP) using parquet{code} # Run following query: {code:java} select int_value, count(1) from test group by int_value, dims.dim_x, dims.dim_y grouping sets ( (int_value, dims.dim_x), (int_value, dims.dim_y)){code} The call stack: {noformat} org.apache.spark.sql.AnalysisException: dims#34[dim_x] AS dim_x#35 doesn't show up in the GROUP BY list ArrayBuffer(int_value#33 AS int_value#41, dims#34[dim_x] AS dim_x#37 AS dim_x#42, dims#34[dim_y] AS dim_y#38 AS dim_y#43); at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19$$anonfun$apply$49$$anonfun$21.apply(Analyzer.scala:387) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19$$anonfun$apply$49$$anonfun$21.apply(Analyzer.scala:387) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19$$anonfun$apply$49.apply(Analyzer.scala:386) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19$$anonfun$apply$49.apply(Analyzer.scala:385) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19.apply(Analyzer.scala:385) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$19.apply(Analyzer.scala:384) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.constructExpand(Analyzer.scala:384) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveGroupingAnalytics$$constructAggregate(Analyzer.scala:448) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$apply$6.applyOrElse(Analyzer.scala:485) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$apply$6.applyOrElse(Analyzer.scala:473) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.apply(Analyzer.scala:473) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.apply(Analyzer.scala:287) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) at scala.collection.immutable.List.foreach(List.scala:381)
[jira] [Updated] (SPARK-36252) Add log files rolling policy for driver running in cluster mode with spark standalone cluster
[ https://issues.apache.org/jira/browse/SPARK-36252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-36252: Description: For a long running driver in cluster mode, there is no rolling policy, the log (stdout/stderr) may accupy lots of space, user needs an external tool to clean the old logs, it's not user friendly. For executor, following 5 configurations is used to control the log file rolling policy: {code:java} spark.executor.logs.rolling.maxRetainedFiles spark.executor.logs.rolling.enableCompression spark.executor.logs.rolling.maxSize spark.executor.logs.rolling.strategy spark.executor.logs.rolling.time.interval {code} For driver running in cluster mode: 1. reuse the executor settings 2. similar to executor: add following configurations (only works for stderr/stdout for driver in cluster mode) {code:java} spark.driver.logs.rolling.maxRetainedFiles spark.driver.logs.rolling.enableCompression spark.driver.logs.rolling.maxSize spark.driver.logs.rolling.strategy spark.driver.logs.rolling.time.interval {code} #2 seems better, do you agree? was: For a long running driver in cluster mode, there is no rolling policy, the log (stdout/stderr) may accupy lots of space, user needs a external tool to clean the old logs, it's not friendly. For executor, following 5 configurations is used to control the log file rolling policy: {code:java} spark.executor.logs.rolling.maxRetainedFiles spark.executor.logs.rolling.enableCompression spark.executor.logs.rolling.maxSize spark.executor.logs.rolling.strategy spark.executor.logs.rolling.time.interval {code} For driver running in cluster mode: 1. reuse the executor settings 2. similar to executor: add following configurations (only works for stderr/stdout for driver in cluster mode) {code:java} spark.driver.logs.rolling.maxRetainedFiles spark.driver.logs.rolling.enableCompression spark.driver.logs.rolling.maxSize spark.driver.logs.rolling.strategy spark.driver.logs.rolling.time.interval {code} #2 seems better, do you agree? > Add log files rolling policy for driver running in cluster mode with spark > standalone cluster > - > > Key: SPARK-36252 > URL: https://issues.apache.org/jira/browse/SPARK-36252 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.1.2 >Reporter: Jack Hu >Priority: Major > > For a long running driver in cluster mode, there is no rolling policy, the > log (stdout/stderr) may accupy lots of space, user needs an external tool to > clean the old logs, it's not user friendly. > For executor, following 5 configurations is used to control the log file > rolling policy: > {code:java} > spark.executor.logs.rolling.maxRetainedFiles > spark.executor.logs.rolling.enableCompression > spark.executor.logs.rolling.maxSize > spark.executor.logs.rolling.strategy > spark.executor.logs.rolling.time.interval > {code} > For driver running in cluster mode: > 1. reuse the executor settings > 2. similar to executor: add following configurations (only works for > stderr/stdout for driver in cluster mode) > {code:java} > spark.driver.logs.rolling.maxRetainedFiles > spark.driver.logs.rolling.enableCompression > spark.driver.logs.rolling.maxSize > spark.driver.logs.rolling.strategy > spark.driver.logs.rolling.time.interval > {code} > #2 seems better, do you agree? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36252) Add log files rolling policy for driver running in cluster mode with spark standalone cluster
Jack Hu created SPARK-36252: --- Summary: Add log files rolling policy for driver running in cluster mode with spark standalone cluster Key: SPARK-36252 URL: https://issues.apache.org/jira/browse/SPARK-36252 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.1.2 Reporter: Jack Hu For a long running driver in cluster mode, there is no rolling policy, the log (stdout/stderr) may accupy lots of space, user needs a external tool to clean the old logs, it's not friendly. For executor, following 5 configurations is used to control the log file rolling policy: {code:java} spark.executor.logs.rolling.maxRetainedFiles spark.executor.logs.rolling.enableCompression spark.executor.logs.rolling.maxSize spark.executor.logs.rolling.strategy spark.executor.logs.rolling.time.interval {code} For driver running in cluster mode: 1. reuse the executor settings 2. similar to executor: add following configurations (only works for stderr/stdout for driver in cluster mode) {code:java} spark.driver.logs.rolling.maxRetainedFiles spark.driver.logs.rolling.enableCompression spark.driver.logs.rolling.maxSize spark.driver.logs.rolling.strategy spark.driver.logs.rolling.time.interval {code} #2 seems better, do you agree? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure
[ https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372425#comment-17372425 ] Jack Hu edited comment on SPARK-35027 at 7/1/21, 6:42 AM: -- Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59]";, but the cloure in finally only closes the [output stream (to file)|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79], but leave the "inputStream" open., which is the pipe's output stream. was (Author: jhu): Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59]";, but the cloure in finally only closes the [output stream (to file)|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79]], but leave the "inputStream" open., which is the pipe's output stream. > Close the inputStream in FileAppender when writing the logs failure > --- > > Key: SPARK-35027 > URL: https://issues.apache.org/jira/browse/SPARK-35027 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Jack Hu >Priority: Major > > In Spark Cluster, the ExecutorRunner uses FileAppender to redirect the > stdout/stderr of executors to file, when the writing processing is failure > due to some reasons: disk full, the FileAppender will only close the input > stream to file, but leave the pipe's stdout/stderr open, following writting > operation in executor side may be hung. > need to close the inputStream in FileAppender ? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure
[ https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372425#comment-17372425 ] Jack Hu edited comment on SPARK-35027 at 7/1/21, 6:42 AM: -- Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59]";, but the cloure in finally only closes the [output stream (to file)|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79]], but leave the "inputStream" open., which is the pipe's output stream. was (Author: jhu): Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59]";, but the cloure in finally only closes the output stream (to file), [but leave the "inp|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79]utStream"; open., which is the pipe's output stream. > Close the inputStream in FileAppender when writing the logs failure > --- > > Key: SPARK-35027 > URL: https://issues.apache.org/jira/browse/SPARK-35027 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Jack Hu >Priority: Major > > In Spark Cluster, the ExecutorRunner uses FileAppender to redirect the > stdout/stderr of executors to file, when the writing processing is failure > due to some reasons: disk full, the FileAppender will only close the input > stream to file, but leave the pipe's stdout/stderr open, following writting > operation in executor side may be hung. > need to close the inputStream in FileAppender ? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure
[ https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372425#comment-17372425 ] Jack Hu edited comment on SPARK-35027 at 7/1/21, 6:40 AM: -- Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59]";, but the cloure in finally only closes the output stream (to file), [but leave the "inp|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L79]utStream"; open., which is the pipe's output stream. was (Author: jhu): Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59]";, but the cloure in finally only closes the output stream (to file), but leave the "inputStream" open., which is the pipe's output stream. > Close the inputStream in FileAppender when writing the logs failure > --- > > Key: SPARK-35027 > URL: https://issues.apache.org/jira/browse/SPARK-35027 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Jack Hu >Priority: Major > > In Spark Cluster, the ExecutorRunner uses FileAppender to redirect the > stdout/stderr of executors to file, when the writing processing is failure > due to some reasons: disk full, the FileAppender will only close the input > stream to file, but leave the pipe's stdout/stderr open, following writting > operation in executor side may be hung. > need to close the inputStream in FileAppender ? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure
[ https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372425#comment-17372425 ] Jack Hu edited comment on SPARK-35027 at 7/1/21, 6:39 AM: -- Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "[appendStreamToFile|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala#L59]";, but the cloure in finally only closes the output stream (to file), but leave the "inputStream" open., which is the pipe's output stream. was (Author: jhu): Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "appendStreamToFile", but the cloure in finally only closes the output stream (to file), but leave the "inputStream" open., which is the pipe's output stream. > Close the inputStream in FileAppender when writing the logs failure > --- > > Key: SPARK-35027 > URL: https://issues.apache.org/jira/browse/SPARK-35027 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Jack Hu >Priority: Major > > In Spark Cluster, the ExecutorRunner uses FileAppender to redirect the > stdout/stderr of executors to file, when the writing processing is failure > due to some reasons: disk full, the FileAppender will only close the input > stream to file, but leave the pipe's stdout/stderr open, following writting > operation in executor side may be hung. > need to close the inputStream in FileAppender ? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure
[ https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372425#comment-17372425 ] Jack Hu commented on SPARK-35027: - Of course, the "stop" in FileAppender does nothing but set a flag. The exception will be thrown in "appendStreamToFile", but the cloure in finally only closes the output stream (to file), but leave the "inputStream" open., which is the pipe's output stream. > Close the inputStream in FileAppender when writing the logs failure > --- > > Key: SPARK-35027 > URL: https://issues.apache.org/jira/browse/SPARK-35027 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Jack Hu >Priority: Major > > In Spark Cluster, the ExecutorRunner uses FileAppender to redirect the > stdout/stderr of executors to file, when the writing processing is failure > due to some reasons: disk full, the FileAppender will only close the input > stream to file, but leave the pipe's stdout/stderr open, following writting > operation in executor side may be hung. > need to close the inputStream in FileAppender ? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure
[ https://issues.apache.org/jira/browse/SPARK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-35027: Description: In Spark Cluster, the ExecutorRunner uses FileAppender to redirect the stdout/stderr of executors to file, when the writing processing is failure due to some reasons: disk full, the FileAppender will only close the input stream to file, but leave the pipe's stdout/stderr open, following writting operation in executor side may be hung. need to close the inputStream in FileAppender ? was: The ExecutorRunner uses FileAppender to redirect the stdout/stderr of executors to file, when the writing processing is failure due to some reasons: disk full, the FileAppender will only close the input stream to file, but leave the pipe's stdout/stderr open, following writting operation in executor side may be hung. need to close the inputStream in FileAppender ? > Close the inputStream in FileAppender when writing the logs failure > --- > > Key: SPARK-35027 > URL: https://issues.apache.org/jira/browse/SPARK-35027 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Jack Hu >Priority: Major > > In Spark Cluster, the ExecutorRunner uses FileAppender to redirect the > stdout/stderr of executors to file, when the writing processing is failure > due to some reasons: disk full, the FileAppender will only close the input > stream to file, but leave the pipe's stdout/stderr open, following writting > operation in executor side may be hung. > need to close the inputStream in FileAppender ? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35027) Close the inputStream in FileAppender when writing the logs failure
Jack Hu created SPARK-35027: --- Summary: Close the inputStream in FileAppender when writing the logs failure Key: SPARK-35027 URL: https://issues.apache.org/jira/browse/SPARK-35027 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.1 Reporter: Jack Hu The ExecutorRunner uses FileAppender to redirect the stdout/stderr of executors to file, when the writing processing is failure due to some reasons: disk full, the FileAppender will only close the input stream to file, but leave the pipe's stdout/stderr open, following writting operation in executor side may be hung. need to close the inputStream in FileAppender ? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21480) Memory leak in org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult
[ https://issues.apache.org/jira/browse/SPARK-21480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16094351#comment-16094351 ] Jack Hu commented on SPARK-21480: - The issue seems resovled in latest HIVE: [HIVE-15551|https://issues.apache.org/jira/browse/HIVE-15551] > Memory leak in > org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult > -- > > Key: SPARK-21480 > URL: https://issues.apache.org/jira/browse/SPARK-21480 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Jack Hu > > There is memory leak in hive with mysql in > {{org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult}}, > which create sql statements, but not close it. > Here is the simple one to recreate it: > {code:java} > val port = 1 > val stream = ssc.socketTextStream("host", port).map(x => > (x,x)).updateStateByKey( > (inputs : Seq[String], s : Option[String]) => > inputs.lastOption.orElse(s) > ) > stream.foreachRDD((rdd, t) => { > > hiveContext.sparkSession.createDataFrame(rdd).write.mode("overwrite").saveAsTable("t") > } > {code} > Here is the hive settings > {code} > hive.metastore.warehouse.dir=file:///user/hive/warehouse > javax.jdo.option.ConnectionURL=jdbc:mysql://ip:3306/hive > spark.sql.warehouse.dir=file:///user/hive/warehouse > javax.jdo.option.ConnectionDriveName=com.mysql.jdbc.Driver > javax.jdo.option.ConnectionUserName=hive > javax.jdo.option.ConnectionPassword=hive > hive.exec.dynamic.partition.mode=nonstrict > {code} > After execute a while, there are many instances of > {{com.mysql.jdbc.JDBC42ResultSet}} and {{com.mysql.jdbc.StatementImpl}} and > keep increasing. > After attache a debugger, we found the statements create in > {{org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult}} but > not closed > {code:java} > private void executeNoResult(final String queryText) throws SQLException { > JDOConnection jdoConn = pm.getDataStoreConnection(); > boolean doTrace = LOG.isDebugEnabled(); > try { > long start = doTrace ? System.nanoTime() : 0; > > ((Connection)jdoConn.getNativeConnection()).createStatement().execute(queryText); > timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0); > } finally { > jdoConn.close(); // We must release the connection before we call other > pm methods. > } > } > {code} > The reference call stack is > {code:java} > at com.mysql.jdbc.JDBC42ResultSet.(JDBC42ResultSet.java:44) > at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown > Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:404) > at com.mysql.jdbc.ResultSetImpl.getInstance(ResultSetImpl.java:319) > at com.mysql.jdbc.MysqlIO.buildResultSetWithUpdates(MysqlIO.java:3114) > at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3014) > at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2280) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2546) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2504) > at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840) > at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740) > at com.jolbox.bonecp.StatementHandle.execute(StatementHandle.java:254) > at > org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult(MetaStoreDirectSql.java:233) > at > org.apache.hadoop.hive.metastore.MetaStoreDirectSql.doDbSpecificInitializationsBeforeQuery(MetaStoreDirectSql.java:222) > at > org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getDatabase(MetaStoreDirectSql.java:263) > at > org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:578) > at > org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:575) > at > org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2385) > at > org.apache.hadoop.hive.metastore.ObjectStore.getDatabaseInternal(ObjectStore.java:575) > at > org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:559) > at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114) >
[jira] [Created] (SPARK-21480) Memory leak in org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult
Jack Hu created SPARK-21480: --- Summary: Memory leak in org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult Key: SPARK-21480 URL: https://issues.apache.org/jira/browse/SPARK-21480 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.1 Reporter: Jack Hu There is memory leak in hive with mysql in {{org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult}}, which create sql statements, but not close it. Here is the simple one to recreate it: {code:java} val port = 1 val stream = ssc.socketTextStream("host", port).map(x => (x,x)).updateStateByKey( (inputs : Seq[String], s : Option[String]) => inputs.lastOption.orElse(s) ) stream.foreachRDD((rdd, t) => { hiveContext.sparkSession.createDataFrame(rdd).write.mode("overwrite").saveAsTable("t") } {code} Here is the hive settings {code} hive.metastore.warehouse.dir=file:///user/hive/warehouse javax.jdo.option.ConnectionURL=jdbc:mysql://ip:3306/hive spark.sql.warehouse.dir=file:///user/hive/warehouse javax.jdo.option.ConnectionDriveName=com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName=hive javax.jdo.option.ConnectionPassword=hive hive.exec.dynamic.partition.mode=nonstrict {code} After execute a while, there are many instances of {{com.mysql.jdbc.JDBC42ResultSet}} and {{com.mysql.jdbc.StatementImpl}} and keep increasing. After attache a debugger, we found the statements create in {{org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult}} but not closed {code:java} private void executeNoResult(final String queryText) throws SQLException { JDOConnection jdoConn = pm.getDataStoreConnection(); boolean doTrace = LOG.isDebugEnabled(); try { long start = doTrace ? System.nanoTime() : 0; ((Connection)jdoConn.getNativeConnection()).createStatement().execute(queryText); timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0); } finally { jdoConn.close(); // We must release the connection before we call other pm methods. } } {code} The reference call stack is {code:java} at com.mysql.jdbc.JDBC42ResultSet.(JDBC42ResultSet.java:44) at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at com.mysql.jdbc.Util.handleNewInstance(Util.java:404) at com.mysql.jdbc.ResultSetImpl.getInstance(ResultSetImpl.java:319) at com.mysql.jdbc.MysqlIO.buildResultSetWithUpdates(MysqlIO.java:3114) at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3014) at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2280) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2546) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2504) at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840) at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740) at com.jolbox.bonecp.StatementHandle.execute(StatementHandle.java:254) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult(MetaStoreDirectSql.java:233) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.doDbSpecificInitializationsBeforeQuery(MetaStoreDirectSql.java:222) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getDatabase(MetaStoreDirectSql.java:263) at org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:578) at org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:575) at org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2385) at org.apache.hadoop.hive.metastore.ObjectStore.getDatabaseInternal(ObjectStore.java:575) at org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:559) at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114) at com.sun.proxy.$Proxy9.getDatabase(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_database_core(HiveMetaStore.java:956) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_database(HiveMetaStore.java:930) at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.i
[jira] [Updated] (SPARK-16413) The JDBC UI does not show the job id when spark.sql.thriftServer.incrementalCollect=true
[ https://issues.apache.org/jira/browse/SPARK-16413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-16413: Attachment: no_job_id.png ThriftServer2 does not show the "job id" > The JDBC UI does not show the job id when > spark.sql.thriftServer.incrementalCollect=true > > > Key: SPARK-16413 > URL: https://issues.apache.org/jira/browse/SPARK-16413 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2 >Reporter: Jack Hu > Labels: SQL, ThriftServer2, UI > Attachments: no_job_id.png > > > When set {{spark.sql.thriftServer.incrementalCollect=true}}, the Hive Thrift > Server2 UI can not show the "job id" in SQL statistics. > To reproduce this, run this > {code} > spark-submit --conf spark.sql.thriftServer.incrementalCollect=true --class > org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 > {code} > then submit some sql. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16413) The JDBC UI does not show the job id when spark.sql.thriftServer.incrementalCollect=true
Jack Hu created SPARK-16413: --- Summary: The JDBC UI does not show the job id when spark.sql.thriftServer.incrementalCollect=true Key: SPARK-16413 URL: https://issues.apache.org/jira/browse/SPARK-16413 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.2 Reporter: Jack Hu When set {{spark.sql.thriftServer.incrementalCollect=true}}, the Hive Thrift Server2 UI can not show the "job id" in SQL statistics. To reproduce this, run this {code} spark-submit --conf spark.sql.thriftServer.incrementalCollect=true --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 {code} then submit some sql. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15116545#comment-15116545 ] Jack Hu commented on SPARK-6847: Hi [~zsxwing] Even user does not implicit do checkpoint after the {{upstateByKey}}, this issue still will happen in following cases # {{updateStateByKey().filter().updateStateByKey()}} # {{updateStateByKey().filter().reduceByKeyAndWindow(reduce, inreduce, ...)}} # {{reduceByKeyAndWindow(reduce,inreduce,...).filter().udateStateByKey()}} If do not plan to fix this issue, may be an implicit workaround/warning should give to user to such usage. It will be very hard to find the real cause if the application is complicate. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114722#comment-15114722 ] Jack Hu commented on SPARK-6847: Test on latest 1.6 branch (f913f7e [SPARK-12120][PYSPARK] Improve exception message when failing to init), it still exists. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111849#comment-15111849 ] Jack Hu commented on SPARK-6847: Hi [~zsxwing] I just test a simple case with 1.6, it still exists: {code} batch interval = 2 seconds source.updateStateByKey(func).map(f).checkpoint(2 seconds) {code} > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11749) Duplicate creating the RDD in file stream when recovering from checkpoint data
Jack Hu created SPARK-11749: --- Summary: Duplicate creating the RDD in file stream when recovering from checkpoint data Key: SPARK-11749 URL: https://issues.apache.org/jira/browse/SPARK-11749 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.5.0, 1.5.2 Reporter: Jack Hu I have a case to monitor a HDFS folder, then enrich the incoming data from the HDFS folder via different table (about 15 reference tables) and send to different hive table after some operations. The code is as this: {code} val txt = ssc.textFileStream(folder).map(toKeyValuePair).reduceByKey(removeDuplicates) val refTable1 = ssc.textFileStream(refSource1).map(parse(_)).updateStateByKey(...) txt.join(refTable1).map(..).reduceByKey(...).foreachRDD( rdd => { // insert into hive table } ) val refTable2 = ssc.textFileStream(refSource2).map(parse(_)).updateStateByKey(...) txt.join(refTable2).map(..).reduceByKey(...).foreachRDD( rdd => { // insert into hive table } ) /// more refTables in following code {code} The {{batchInterval}} of this application is set to *30 seconds*, the checkpoint interval is set to *10 minutes*, every batch in {{txt}} has *60 files* After recovered from checkpoint data, I can see lots of log to create the RDD in file stream: rdd in each batch of file stream was been recreated *15 times*, and it takes about *5 minutes* to create so much file RDD. During this period, *10K+ broadcast* had been created and almost used all the block manager space. After some investigation, we found that the {{DStream.restoreCheckpointData}} would be invoked at each output ({{DStream.foreachRDD}} in this case), and no flag to indicate that this {{DStream}} had been restored, so the RDD in file stream was been recreated. Suggest to add on flag to control the restore process to avoid the duplicated work. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14951593#comment-14951593 ] Jack Hu commented on SPARK-6847: Hi [~glyton.camilleri] You can check whether there are two dstreams in the DAG need to be checkpointed (updateStateByKey, reduceByKeyAndWindow), it yes, you can workaround this to use some output for the previous DStream which needs to checkpointed. {code} val d1 = input.updateStateByKey(func) val d2 = d1.map(...).updateStateByKey(func) d2.foreachRDD(rdd => print(rdd.count)) /// workaround the stack over flow listed in this JIRA d1.foreachRDD(rdd => rdd.foreach(_ => Unit)) {code} > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10772) NullPointerException when transform function in DStream returns NULL
Jack Hu created SPARK-10772: --- Summary: NullPointerException when transform function in DStream returns NULL Key: SPARK-10772 URL: https://issues.apache.org/jira/browse/SPARK-10772 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.5.0, 1.4.1 Reporter: Jack Hu Priority: Minor NullPointerException raises when transform function returns NULL: {quote} java.lang.NullPointerException at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$3.apply(DStream.scala:442) at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$3.apply(DStream.scala:441) at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107) at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107) at org.apache.spark.streaming.dstream.DStream.clearMetadata(DStream.scala:441) at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$5.apply(DStream.scala:454) at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$5.apply(DStream.scala:454) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.clearMetadata(DStream.scala:454) at org.apache.spark.streaming.DStreamGraph$$anonfun$clearMetadata$2.apply(DStreamGraph.scala:129) at org.apache.spark.streaming.DStreamGraph$$anonfun$clearMetadata$2.apply(DStreamGraph.scala:129) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.DStreamGraph.clearMetadata(DStreamGraph.scala:129) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:257) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {quote} The code is very simple: {code} val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) import sqlContext.implicits._ println(">>> create streamingContext.") val ssc = new StreamingContext(sc, Seconds(1)) ssc.queueStream( Queue( sc.makeRDD(Seq(1)), sc.makeRDD(Seq[Int]()), sc.makeRDD(Seq(2)) ), true).transform(rdd => if (rdd.isEmpty()) rdd else null).print ssc.start() {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode
[ https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-7624: --- Description: I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives json string from a socket with rate 50 events per second, it can run well in first 6 hours (although the minor gc count per minute is increasing all the time), after that, i can see that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after 10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the java memory after 16 hours and can see there are about 20 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}} {code} def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) val tasks = scheduler.resourceOffers(offers).flatten for (task <- tasks) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) } } {code} I removed the last three lines in this method (the whole {{if}} block, which is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running, the scheduler delay is about 10 ms all the time. So there should have some conditions that the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that this is the root cause of this issue. My spark settings: # Memor: 3G # CPU: 8 cores # Streaming Batch interval: 5 seconds. Here are my streaming code: {code} val input = ssc.socketTextStream( hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions( /// parse the json to Order Order(_), preservePartitioning = true) val mresult = input.map( v => (v.customer, UserSpending(v.customer, v.count * v.price, v.timestamp.toLong))).cache() val tempr = mresult.window( Seconds(firstStageWindowSize), Seconds(firstStageWindowSize) ).transform( rdd => rdd.union(rdd).union(rdd).union(rdd) ) tempr.count.print tempr.cache().foreachRDD((rdd, t) => { for (i <- 1 to 5) { val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count() println("""T: """ + t + """: """ + c) } }) {code} Updated at 2015-05-15 I did print some detail schedule times of the suspect lines in {{LocalActor::reviveOffers}}: {color:red}*1685343501*{color} times after 18 hours running. was: I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives json string from a socket with rate 50 events per second, it can run well in first 6 hours (although the minor gc count per minute is increasing all the time), after that, i can see that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after 10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the java memory after 16 hours and can see there are about 20 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}} {code} def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) val tasks = scheduler.resourceOffers(offers).flatten for (task <- tasks) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) } } {code} I removed the last three lines in this method (the whole {{if}} block, which is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running
[jira] [Updated] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode
[ https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-7624: --- Description: I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives json string from a socket with rate 50 events per second, it can run well in first 6 hours (although the minor gc count per minute is increasing all the time), after that, i can see that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after 10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the java memory after 16 hours and can see there are about 20 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}} {code} def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) val tasks = scheduler.resourceOffers(offers).flatten for (task <- tasks) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) } } {code} I removed the last three lines in this method (the whole {{if}} block, which is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running, the scheduler delay is about 10 ms all the time. So there should have some conditions that the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that this is the root cause of this issue. My spark settings: # Memor: 3G # CPU: 8 cores # Streaming Batch interval: 5 seconds. Here are my streaming code: {code} val input = ssc.socketTextStream( hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions( /// parse the json to Order Order(_), preservePartitioning = true) val mresult = input.map( v => (v.customer, UserSpending(v.customer, v.count * v.price, v.timestamp.toLong))).cache() val tempr = mresult.window( Seconds(firstStageWindowSize), Seconds(firstStageWindowSize) ).transform( rdd => rdd.union(rdd).union(rdd).union(rdd) ) tempr.count.print tempr.cache().foreachRDD((rdd, t) => { for (i <- 1 to 5) { val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count() println("""T: """ + t + """: """ + c) } }) {code} Updated at 2015-05-15 I did print some detail schedule times of the suspect lines in {{LocalActor::reviveOffers}}: {color:red}1685343501{color} times after 18 hours running. was: I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives json string from a socket with rate 50 events per second, it can run well in first 6 hours (although the minor gc count per minute is increasing all the time), after that, i can see that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after 10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the java memory after 16 hours and can see there are about 20 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}} {code} def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) val tasks = scheduler.resourceOffers(offers).flatten for (task <- tasks) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) } } {code} I removed the last three lines in this method (the whole {{if}} block, which is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running,
[jira] [Updated] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode
[ https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-7624: --- Description: I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives json string from a socket with rate 50 events per second, it can run well in first 6 hours (although the minor gc count per minute is increasing all the time), after that, i can see that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after 10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the java memory after 16 hours and can see there are about 20 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}} {code} def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) val tasks = scheduler.resourceOffers(offers).flatten for (task <- tasks) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) } } {code} I removed the last three lines in this method (the whole {{if}} block, which is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running, the scheduler delay is about 10 ms all the time. So there should have some conditions that the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that this is the root cause of this issue. My spark settings: # Memor: 3G # CPU: 8 cores # Streaming Batch interval: 5 seconds. Here are my streaming code: {code} val input = ssc.socketTextStream( hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions( /// parse the json to Order Order(_), preservePartitioning = true) val mresult = input.map( v => (v.customer, UserSpending(v.customer, v.count * v.price, v.timestamp.toLong))).cache() val tempr = mresult.window( Seconds(firstStageWindowSize), Seconds(firstStageWindowSize) ).transform( rdd => rdd.union(rdd).union(rdd).union(rdd) ) tempr.count.print tempr.cache().foreachRDD((rdd, t) => { for (i <- 1 to 5) { val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count() println("""T: """ + t + """: """ + c) } }) {code} was: I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives json string from a socket with rate 50 events per second, it can run well in first 6 hours (although the minor gc count per minute is increasing all the time), after that, i can see that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after 10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the java memory after 16 hours and can see there are about 20 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}} {code} def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) val tasks = scheduler.resourceOffers(offers).flatten for (task <- tasks) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) } {code} I removed the last three lines in this method (the whole {{if}} block, which is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running, the scheduler delay is about 10 ms all the time. So there should have some conditions that the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that this is the root cause of this issue. My spark
[jira] [Created] (SPARK-7624) Task scheduler delay is increasing time over time in spark local mode
Jack Hu created SPARK-7624: -- Summary: Task scheduler delay is increasing time over time in spark local mode Key: SPARK-7624 URL: https://issues.apache.org/jira/browse/SPARK-7624 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.3.1 Reporter: Jack Hu I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives json string from a socket with rate 50 events per second, it can run well in first 6 hours (although the minor gc count per minute is increasing all the time), after that, i can see that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after 10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the java memory after 16 hours and can see there are about 20 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the code and see only one place may put the {{ReviveOffers}} to akka {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}} {code} def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) val tasks = scheduler.resourceOffers(offers).flatten for (task <- tasks) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) } {code} I removed the last three lines in this method (the whole {{if}} block, which is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running, the scheduler delay is about 10 ms all the time. So there should have some conditions that the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that this is the root cause of this issue. My spark settings: # Memor: 3G # CPU: 8 cores # Streaming Batch interval: 5 seconds. Here are my streaming code: {code} val input = ssc.socketTextStream( hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions( /// parse the json to Order Order(_), preservePartitioning = true) val mresult = input.map( v => (v.customer, UserSpending(v.customer, v.count * v.price, v.timestamp.toLong))).cache() val tempr = mresult.window( Seconds(firstStageWindowSize), Seconds(firstStageWindowSize) ).transform( rdd => rdd.union(rdd).union(rdd).union(rdd) ) tempr.count.print tempr.cache().foreachRDD((rdd, t) => { for (i <- 1 to 5) { val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count() println("""T: """ + t + """: """ + c) } }) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7183) Memory leak in netty shuffle with spark standalone cluster
[ https://issues.apache.org/jira/browse/SPARK-7183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532108#comment-14532108 ] Jack Hu commented on SPARK-7183: Hi, [~sowen] Do we plan to add this to 1.3+? If there is any plan to release more minor release for 1.3+ like 1.3.2. > Memory leak in netty shuffle with spark standalone cluster > -- > > Key: SPARK-7183 > URL: https://issues.apache.org/jira/browse/SPARK-7183 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.3.0 >Reporter: Jack Hu >Assignee: Liang-Chi Hsieh > Labels: memory-leak, netty, shuffle > Fix For: 1.4.0 > > > There is slow leak in netty shuffle with spark cluster in > {{TransportRequestHandler.streamIds}} > In spark cluster, there are some reusable netty connections between two block > managers to get/send blocks between worker/drivers. These connections are > handled by the {{org.apache.spark.network.server.TransportRequestHandler}} in > server side. This handler keep tracking all the streamids negotiate by RPC > when shuffle data need transform in these two block managers and the streamid > is keeping increasing, and never get a chance to be deleted exception this > connection is dropped (seems never happen in normal running). > Here are some detail logs of this {{TransportRequestHandler}} (Note: we add > a log a print the total size of {{TransportRequestHandler.streamIds}}, the > log is "Current set size is N of > org.apache.spark.network.server.TransportRequestHandler@ADDRESS", this set > size is keeping increasing in our test) > {quote} > 15/04/22 21:00:16 DEBUG TransportServer: Shuffle server started on port :46288 > 15/04/22 21:00:16 INFO NettyBlockTransferService: Server created on 46288 > 15/04/22 21:00:31 INFO TransportRequestHandler: Created > TransportRequestHandler > org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 > 15/04/22 21:00:32 TRACE MessageDecoder: Received message RpcRequest: > RpcRequest\{requestId=6655045571437304938, message=[B@59778678\} > 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Received request: > OpenBlocks\{appId=app-20150422210016-, execId=, > blockIds=[broadcast_1_piece0]} > 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Registered streamId > 1387459488000 with 1 buffers > 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result > RpcResponse\{requestId=6655045571437304938, response=[B@d2840b\} to client > /10.111.7.150:33802 > 15/04/22 21:00:33 TRACE MessageDecoder: Received message ChunkFetchRequest: > ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488000, > chunkIndex=0}} > 15/04/22 21:00:33 TRACE TransportRequestHandler: Received req from > /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488000, > chunkIndex=0\} > 15/04/22 21:00:33 INFO TransportRequestHandler: Current set size is 1 of > org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 > 15/04/22 21:00:33 TRACE OneForOneStreamManager: Removing stream id > 1387459488000 > 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result > ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488000, > chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 > lim=3839 cap=3839]}} to client /10.111.7.150:33802 > 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: > RpcRequest\{requestId=6660601528868866371, message=[B@42bed1b8\} > 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: > OpenBlocks\{appId=app-20150422210016-, execId=, > blockIds=[broadcast_3_piece0]} > 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Registered streamId > 1387459488001 with 1 buffers > 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result > RpcResponse\{requestId=6660601528868866371, response=[B@7fa3fb60\} to client > /10.111.7.150:33802 > 15/04/22 21:00:34 TRACE MessageDecoder: Received message ChunkFetchRequest: > ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488001, > chunkIndex=0}} > 15/04/22 21:00:34 TRACE TransportRequestHandler: Received req from > /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488001, > chunkIndex=0\} > 15/04/22 21:00:34 INFO TransportRequestHandler: Current set size is 2 of > org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 > 15/04/22 21:00:34 TRACE OneForOneStreamManager: Removing stream id > 1387459488001 > 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result > ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488001, > chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 > lim=4277 cap=4277]}} to client /10.111.7.150:33802 > 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: > RpcReq
[jira] [Updated] (SPARK-7183) Memory leak in netty shuffle with spark standalone cluster
[ https://issues.apache.org/jira/browse/SPARK-7183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-7183: --- Summary: Memory leak in netty shuffle with spark standalone cluster (was: Memory leak with netty shuffle with spark standalone cluster) > Memory leak in netty shuffle with spark standalone cluster > -- > > Key: SPARK-7183 > URL: https://issues.apache.org/jira/browse/SPARK-7183 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: memory-leak, netty, shuffle > > There is slow leak in netty shuffle with spark cluster in > {{TransportRequestHandler.streamIds}} > In spark cluster, there are some reusable netty connections between two block > managers to get/send blocks between worker/drivers. These connections are > handled by the {{org.apache.spark.network.server.TransportRequestHandler}} in > server side. This handler keep tracking all the streamids negotiate by RPC > when shuffle data need transform in these two block managers and the streamid > is keeping increasing, and never get a chance to be deleted exception this > connection is dropped (seems never happen in normal running). > Here are some detail logs of this {{TransportRequestHandler}} (Note: we add > a log a print the total size of {{TransportRequestHandler.streamIds}}, the > log is "Current set size is N of > org.apache.spark.network.server.TransportRequestHandler@ADDRESS", this set > size is keeping increasing in our test) > {quote} > 15/04/22 21:00:16 DEBUG TransportServer: Shuffle server started on port :46288 > 15/04/22 21:00:16 INFO NettyBlockTransferService: Server created on 46288 > 15/04/22 21:00:31 INFO TransportRequestHandler: Created > TransportRequestHandler > org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 > 15/04/22 21:00:32 TRACE MessageDecoder: Received message RpcRequest: > RpcRequest\{requestId=6655045571437304938, message=[B@59778678\} > 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Received request: > OpenBlocks\{appId=app-20150422210016-, execId=, > blockIds=[broadcast_1_piece0]} > 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Registered streamId > 1387459488000 with 1 buffers > 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result > RpcResponse\{requestId=6655045571437304938, response=[B@d2840b\} to client > /10.111.7.150:33802 > 15/04/22 21:00:33 TRACE MessageDecoder: Received message ChunkFetchRequest: > ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488000, > chunkIndex=0}} > 15/04/22 21:00:33 TRACE TransportRequestHandler: Received req from > /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488000, > chunkIndex=0\} > 15/04/22 21:00:33 INFO TransportRequestHandler: Current set size is 1 of > org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 > 15/04/22 21:00:33 TRACE OneForOneStreamManager: Removing stream id > 1387459488000 > 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result > ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488000, > chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 > lim=3839 cap=3839]}} to client /10.111.7.150:33802 > 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: > RpcRequest\{requestId=6660601528868866371, message=[B@42bed1b8\} > 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: > OpenBlocks\{appId=app-20150422210016-, execId=, > blockIds=[broadcast_3_piece0]} > 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Registered streamId > 1387459488001 with 1 buffers > 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result > RpcResponse\{requestId=6660601528868866371, response=[B@7fa3fb60\} to client > /10.111.7.150:33802 > 15/04/22 21:00:34 TRACE MessageDecoder: Received message ChunkFetchRequest: > ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488001, > chunkIndex=0}} > 15/04/22 21:00:34 TRACE TransportRequestHandler: Received req from > /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488001, > chunkIndex=0\} > 15/04/22 21:00:34 INFO TransportRequestHandler: Current set size is 2 of > org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 > 15/04/22 21:00:34 TRACE OneForOneStreamManager: Removing stream id > 1387459488001 > 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result > ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488001, > chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 > lim=4277 cap=4277]}} to client /10.111.7.150:33802 > 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: > RpcRequest\{requestId=8454597410163901330, message=[B@19c673d1\} > 15/04/22 21:00:34 TRACE NettyBlockRpcServer: R
[jira] [Created] (SPARK-7183) Memory leak with netty shuffle with spark standalone cluster
Jack Hu created SPARK-7183: -- Summary: Memory leak with netty shuffle with spark standalone cluster Key: SPARK-7183 URL: https://issues.apache.org/jira/browse/SPARK-7183 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.3.0 Reporter: Jack Hu There is slow leak in netty shuffle with spark cluster in {{TransportRequestHandler.streamIds}} In spark cluster, there are some reusable netty connections between two block managers to get/send blocks between worker/drivers. These connections are handled by the {{org.apache.spark.network.server.TransportRequestHandler}} in server side. This handler keep tracking all the streamids negotiate by RPC when shuffle data need transform in these two block managers and the streamid is keeping increasing, and never get a chance to be deleted exception this connection is dropped (seems never happen in normal running). Here are some detail logs of this {{TransportRequestHandler}} (Note: we add a log a print the total size of {{TransportRequestHandler.streamIds}}, the log is "Current set size is N of org.apache.spark.network.server.TransportRequestHandler@ADDRESS", this set size is keeping increasing in our test) {quote} 15/04/22 21:00:16 DEBUG TransportServer: Shuffle server started on port :46288 15/04/22 21:00:16 INFO NettyBlockTransferService: Server created on 46288 15/04/22 21:00:31 INFO TransportRequestHandler: Created TransportRequestHandler org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 15/04/22 21:00:32 TRACE MessageDecoder: Received message RpcRequest: RpcRequest\{requestId=6655045571437304938, message=[B@59778678\} 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Received request: OpenBlocks\{appId=app-20150422210016-, execId=, blockIds=[broadcast_1_piece0]} 15/04/22 21:00:32 TRACE NettyBlockRpcServer: Registered streamId 1387459488000 with 1 buffers 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result RpcResponse\{requestId=6655045571437304938, response=[B@d2840b\} to client /10.111.7.150:33802 15/04/22 21:00:33 TRACE MessageDecoder: Received message ChunkFetchRequest: ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488000, chunkIndex=0}} 15/04/22 21:00:33 TRACE TransportRequestHandler: Received req from /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488000, chunkIndex=0\} 15/04/22 21:00:33 INFO TransportRequestHandler: Current set size is 1 of org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 15/04/22 21:00:33 TRACE OneForOneStreamManager: Removing stream id 1387459488000 15/04/22 21:00:33 TRACE TransportRequestHandler: Sent result ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488000, chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 lim=3839 cap=3839]}} to client /10.111.7.150:33802 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: RpcRequest\{requestId=6660601528868866371, message=[B@42bed1b8\} 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: OpenBlocks\{appId=app-20150422210016-, execId=, blockIds=[broadcast_3_piece0]} 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Registered streamId 1387459488001 with 1 buffers 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result RpcResponse\{requestId=6660601528868866371, response=[B@7fa3fb60\} to client /10.111.7.150:33802 15/04/22 21:00:34 TRACE MessageDecoder: Received message ChunkFetchRequest: ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488001, chunkIndex=0}} 15/04/22 21:00:34 TRACE TransportRequestHandler: Received req from /10.111.7.150:33802 to fetch block StreamChunkId\{streamId=1387459488001, chunkIndex=0\} 15/04/22 21:00:34 INFO TransportRequestHandler: Current set size is 2 of org.apache.spark.network.server.TransportRequestHandler@29a4f3e7 15/04/22 21:00:34 TRACE OneForOneStreamManager: Removing stream id 1387459488001 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result ChunkFetchSuccess\{streamChunkId=StreamChunkId\{streamId=1387459488001, chunkIndex=0}, buffer=NioManagedBuffer\{buf=java.nio.HeapByteBuffer[pos=0 lim=4277 cap=4277]}} to client /10.111.7.150:33802 15/04/22 21:00:34 TRACE MessageDecoder: Received message RpcRequest: RpcRequest\{requestId=8454597410163901330, message=[B@19c673d1\} 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Received request: OpenBlocks\{appId=app-20150422210016-, execId=, blockIds=[broadcast_2_piece0]} 15/04/22 21:00:34 TRACE NettyBlockRpcServer: Registered streamId 1387459488002 with 1 buffers 15/04/22 21:00:34 TRACE TransportRequestHandler: Sent result RpcResponse\{requestId=8454597410163901330, response=[B@35dbdac2\} to client /10.111.7.150:33802 15/04/22 21:00:34 TRACE MessageDecoder: Received message ChunkFetchRequest: ChunkFetchRequest\{streamChunkId=StreamChunkId\{streamId=1387459488002, c
[jira] [Commented] (SPARK-6844) Memory leak occurs when register temp table with cache table on
[ https://issues.apache.org/jira/browse/SPARK-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14508597#comment-14508597 ] Jack Hu commented on SPARK-6844: Hi, [~marmbrus]] I mean 1.3.X, like 1.3.2. The master seems not much different with branch 1.3 (May be i am wrong) > Memory leak occurs when register temp table with cache table on > --- > > Key: SPARK-6844 > URL: https://issues.apache.org/jira/browse/SPARK-6844 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: Memory, SQL > Fix For: 1.4.0 > > > There is a memory leak in register temp table with cache on > This is the simple code to reproduce this issue: > {code} > val sparkConf = new SparkConf().setAppName("LeakTest") > val sparkContext = new SparkContext(sparkConf) > val sqlContext = new SQLContext(sparkContext) > val tableName = "tmp" > val jsonrdd = sparkContext.textFile("""sample.json""") > var loopCount = 1L > while(true) { > sqlContext.jsonRDD(jsonrdd).registerTempTable(tableName) > sqlContext.cacheTable(tableName) > println("L: " +loopCount + " R:" + sqlContext.sql("""select count(*) > from tmp""").count()) > sqlContext.uncacheTable(tableName) > loopCount += 1 > } > {code} > The cause is that the {{InMemoryRelation}}. {{InMemoryColumnarTableScan}} > uses the accumulator > ({{InMemoryRelation.batchStats}},{{InMemoryColumnarTableScan.readPartitions}}, > {{InMemoryColumnarTableScan.readBatches}} ) to get some information from > partitions or for test. These accumulators will register itself into a static > map in {{Accumulators.originals}} and never get cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6844) Memory leak occurs when register temp table with cache table on
[ https://issues.apache.org/jira/browse/SPARK-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14497627#comment-14497627 ] Jack Hu commented on SPARK-6844: Hi, [~marmbrus] Do we have a plan to port this to 1.3.X branch? > Memory leak occurs when register temp table with cache table on > --- > > Key: SPARK-6844 > URL: https://issues.apache.org/jira/browse/SPARK-6844 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: Memory, SQL > Fix For: 1.4.0 > > > There is a memory leak in register temp table with cache on > This is the simple code to reproduce this issue: > {code} > val sparkConf = new SparkConf().setAppName("LeakTest") > val sparkContext = new SparkContext(sparkConf) > val sqlContext = new SQLContext(sparkContext) > val tableName = "tmp" > val jsonrdd = sparkContext.textFile("""sample.json""") > var loopCount = 1L > while(true) { > sqlContext.jsonRDD(jsonrdd).registerTempTable(tableName) > sqlContext.cacheTable(tableName) > println("L: " +loopCount + " R:" + sqlContext.sql("""select count(*) > from tmp""").count()) > sqlContext.uncacheTable(tableName) > loopCount += 1 > } > {code} > The cause is that the {{InMemoryRelation}}. {{InMemoryColumnarTableScan}} > uses the accumulator > ({{InMemoryRelation.batchStats}},{{InMemoryColumnarTableScan.readPartitions}}, > {{InMemoryColumnarTableScan.readBatches}} ) to get some information from > partitions or for test. These accumulators will register itself into a static > map in {{Accumulators.originals}} and never get cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14495914#comment-14495914 ] Jack Hu commented on SPARK-6847: I did a little more investigation about this issue, that appears to be a problem with some operations({{updateStateByKey}}, {{reduceByKeyAndWindow}} with in-reduce function) which must be check-pointed and followed by a operation with checkpoint (either manual added like the code of this JIRA description or an operation which must be check-pointed) and the checkpoint interval of these two operation is the same (or the followed operation has a checkpoint interval the same with batch interval). The following code will have this issue: assume default batch interval is 2 seconds, the default checkpoint interval is 10 seconds # {{source.updateStateByKey(func).map(f).checkpoint(10 seconds)}} # {{source.updateStateByKey(func).map(f).updateStateByKey(func2)}} # {{source.updateStateByKey(func).map(f).checkpoint(2 seconds)}} These DO NOT have this issue # {{source.updateStateByKey(func).map(f).checkpoint(4 seconds)}} # {{source.updateStateByKey(func).map(f).updateStateByKey(func2).checkpoint(4 seconds)}} A rdd graph which contains two rdds needs to be check-pointed would be generated from these sample codes. If the child(ren) rdd(s) also need to do the checkpoint at the same time the parent needs to do, then the parent will not do checkpoint according the {{rdd.doCheckpoint}}. In this case, the rdd comes from {{updateStateByKey}} will never be check-pointed at the issued sample code, that leads the stack overflow. ({{updateStateByKey}} needs checkpoint to break the dependency in this operation) If the child(ren) rdd(s) is not always check-pointed at the same time of the parent needs to do, there is a chance that the parent rdd (comes from {{updateStateByKey}}) can do some successful checkpoint to break the dependency, although the checkpoint may have some delay. So no stack overflow will happen. So, currently, we got a workaround of this issue by setting the checkpoint interval to different values if we use operations that must be check-pointed in streaming project. Maybe this is not a easy fix here, hope we can add some validation at least > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14493804#comment-14493804 ] Jack Hu commented on SPARK-6847: Hi, [~sowen] The checkpoint interval can not be turn down (smaller than 10 seconds) since it must be bigger or equal than the batch interval. I will try to more checkpoint interval like 20 seconds, 30 seconds... We have a real case that has the same problem, it only updates small set of values per key per interval (one event per key per interval) One observation is that: the {{updateStateByKey}} is automatically checkpointed > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14493517#comment-14493517 ] Jack Hu commented on SPARK-6847: Here is the part of the stack (Full stack at: https://gist.github.com/jhu-chang/38a6c052aff1d666b785) {quote} 15/04/14 11:28:20 [Executor task launch worker-1] ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 27554.0 (TID 3801) java.lang.StackOverflowError at java.io.ObjectStreamClass.setPrimFieldValues(ObjectStreamClass.java:1243) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1984) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:366) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) {quote} > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} wit
[jira] [Comment Edited] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14491873#comment-14491873 ] Jack Hu edited comment on SPARK-6847 at 4/13/15 3:34 AM: - Hi, [~sowen] I tested more cases: # only change the {{newlist.headOption.orElse(oldstate)}} to {{Some("a")}}, the issue still exists # only change the streaming batch interval to {{2 seconds}}, keep the {{newlist.headOption.orElse(oldstate)}} and checkpoint interval 10 seconds, the issue does not exist. So this issue may be related to the checkpoint interval and batch interval. was (Author: jhu): Hi, [~sowen] I tested more cases: # only change the {{newlist.headOption.orElse(oldstate)}} to {{Some("a")}}, the issue still exists # only change the streaming batch interval to {{2 seconds}}, keep the {{newlist.headOption.orElse(oldstate)}} and checkpoint interval 10 seconds, the issue does not exist. So this issue may related to the checkpoint interval and batch interval. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14491873#comment-14491873 ] Jack Hu commented on SPARK-6847: Hi, [~sowen] I tested more cases: # only change the {{newlist.headOption.orElse(oldstate)}} to {{Some("a")}}, the issue still exists # only change the streaming batch interval to {{2 seconds}}, keep the {{newlist.headOption.orElse(oldstate)}} and checkpoint interval 10 seconds, the issue does not exist. So this issue may related to the checkpoint interval and batch interval. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists
[ https://issues.apache.org/jira/browse/SPARK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu resolved SPARK-6180. Resolution: Fixed Fix Version/s: 1.3.1 Fixed in this pull https://github.com/apache/spark/pull/4365 > Error logged into log4j when use the HiveMetastoreCatalog::tableExists > -- > > Key: SPARK-6180 > URL: https://issues.apache.org/jira/browse/SPARK-6180 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.2.1 >Reporter: Jack Hu >Priority: Minor > Labels: Hive, HiveMetastoreCatalog, spark, starter > Fix For: 1.3.1 > > > When using {{HiveMetastoreCatalog.tableExists}} to check a table that does > not exist in hive store, there is one error message like this logged into log > file, the function returns {{false}} as desired. > To avoid this error log, one way is to use {{Hive.getTable(databaseName, > tblName, false)}} instead of {{Hive.getTable(databaseName, tblName)}} > {quote} > 15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: > NoSuchObjectException(message:default.demotable table not found) > at > org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) > at com.sun.proxy.$Proxy15.get_table(Unknown Source) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) > at com.sun.proxy.$Proxy16.getTable(Unknown Source) > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976) > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) > at > org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) > at > org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) > at > org.apache.spark.sql.catalyst.analysis.Ana
[jira] [Closed] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists
[ https://issues.apache.org/jira/browse/SPARK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu closed SPARK-6180. -- Fixed > Error logged into log4j when use the HiveMetastoreCatalog::tableExists > -- > > Key: SPARK-6180 > URL: https://issues.apache.org/jira/browse/SPARK-6180 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.2.1 >Reporter: Jack Hu >Priority: Minor > Labels: Hive, HiveMetastoreCatalog, spark, starter > Fix For: 1.3.1 > > > When using {{HiveMetastoreCatalog.tableExists}} to check a table that does > not exist in hive store, there is one error message like this logged into log > file, the function returns {{false}} as desired. > To avoid this error log, one way is to use {{Hive.getTable(databaseName, > tblName, false)}} instead of {{Hive.getTable(databaseName, tblName)}} > {quote} > 15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: > NoSuchObjectException(message:default.demotable table not found) > at > org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) > at com.sun.proxy.$Proxy15.get_table(Unknown Source) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) > at com.sun.proxy.$Proxy16.getTable(Unknown Source) > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976) > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) > at > org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) > at > org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) > at > org.apache.spark.sql.catalyst.analysis.Analyz
[jira] [Commented] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists
[ https://issues.apache.org/jira/browse/SPARK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375336#comment-14375336 ] Jack Hu commented on SPARK-6180: This issue has already fixed at: https://github.com/apache/spark/pull/4365 > Error logged into log4j when use the HiveMetastoreCatalog::tableExists > -- > > Key: SPARK-6180 > URL: https://issues.apache.org/jira/browse/SPARK-6180 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.2.1 >Reporter: Jack Hu >Priority: Minor > Labels: Hive, HiveMetastoreCatalog, spark, starter > > When using {{HiveMetastoreCatalog.tableExists}} to check a table that does > not exist in hive store, there is one error message like this logged into log > file, the function returns {{false}} as desired. > To avoid this error log, one way is to use {{Hive.getTable(databaseName, > tblName, false)}} instead of {{Hive.getTable(databaseName, tblName)}} > {quote} > 15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: > NoSuchObjectException(message:default.demotable table not found) > at > org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) > at com.sun.proxy.$Proxy15.get_table(Unknown Source) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) > at com.sun.proxy.$Proxy16.getTable(Unknown Source) > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976) > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) > at > org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) > at > org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$Resolv
[jira] [Closed] (SPARK-3275) Socket receiver can not recover when the socket server restarted
[ https://issues.apache.org/jira/browse/SPARK-3275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu closed SPARK-3275. -- Resolution: Fixed Fix Version/s: 1.1.0 Checked in 1.1.0, do not find similar issue. > Socket receiver can not recover when the socket server restarted > - > > Key: SPARK-3275 > URL: https://issues.apache.org/jira/browse/SPARK-3275 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.2 >Reporter: Jack Hu > Labels: failover > Fix For: 1.1.0 > > > To reproduce this issue: > 1. create a application with a socket dstream > 2. start the socket server and start the application > 3. restart the socket server > 4. the socket dstream will fail to reconnect (it will close the connection > after a successful connect) > The main issue should be the status in SocketReceiver and ReceiverSupervisor > is incorrect after the reconnect: > In SocketReceiver ::receive() the while loop will never be entered after > reconnect since the isStopped will returns true: > val iterator = bytesToObjects(socket.getInputStream()) > while(!isStopped && iterator.hasNext) { > store(iterator.next) > } > logInfo("Stopped receiving") > restart("Retrying connecting to " + host + ":" + port) > That is caused by the status flag "receiverState" in ReceiverSupervisor will > be set to Stopped when the connection losses, but it is reset after the call > of Receiver start method: > def startReceiver(): Unit = synchronized { > try { > logInfo("Starting receiver") > receiver.onStart() > logInfo("Called receiver onStart") > onReceiverStart() > receiverState = Started > } catch { > case t: Throwable => > stop("Error starting receiver " + streamId, Some(t)) > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists
[ https://issues.apache.org/jira/browse/SPARK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-6180: --- Description: When using {{HiveMetastoreCatalog.tableExists}} to check a table that does not exist in hive store, there is one error message like this logged into log file, the function returns {{false}} as desired. To avoid this error log, one way is to use {{Hive.getTable(databaseName, tblName, false)}} instead of {{Hive.getTable(databaseName, tblName)}} {quote} 15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: NoSuchObjectException(message:default.demotable table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) at com.sun.proxy.$Proxy15.get_table(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997) at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy16.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.
[jira] [Created] (SPARK-6180) Error logged into log4j when use the HiveMetastoreCatalog::tableExists
Jack Hu created SPARK-6180: -- Summary: Error logged into log4j when use the HiveMetastoreCatalog::tableExists Key: SPARK-6180 URL: https://issues.apache.org/jira/browse/SPARK-6180 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.2.1 Reporter: Jack Hu Priority: Minor When using {{HiveMetastoreCatalog.tableExists}} to check a table that does not exist in hive store, there is one error message like this logged into log file, the function returns {{false}} as desired. Could we avoid to print error? (Although it can be disabled in log4j by configuration) {quote} 15/02/13 17:24:34 [Sql Query events] ERROR hive.ql.metadata.Hive: NoSuchObjectException(message:default.demotable table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) at com.sun.proxy.$Proxy15.get_table(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:997) at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy16.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:976) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.col
[jira] [Commented] (SPARK-6061) File source dstream can not include the old file which timestamp is before the system time
[ https://issues.apache.org/jira/browse/SPARK-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348230#comment-14348230 ] Jack Hu commented on SPARK-6061: [~tianyi] Do you know why the {{FileInputDStream.MIN_REMEMBER_DURATION}} is introduced in 1.2.1 (Actually, it was introduced 1.1.1/1.2.0)? > File source dstream can not include the old file which timestamp is before > the system time > -- > > Key: SPARK-6061 > URL: https://issues.apache.org/jira/browse/SPARK-6061 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.1 >Reporter: Jack Hu > Labels: FileSourceDStream, OlderFiles, Streaming > Original Estimate: 1m > Remaining Estimate: 1m > > The file source dstream (StreamContext.fileStream) has a properties named > "newFilesOnly" to include the old files, it worked fine with 1.1.0, and > broken at 1.2.1, the older files always be ignored no mattern what value is > set. > Here is the simple reproduce code: > https://gist.github.com/jhu-chang/1ee5b0788c7479414eeb > The reason is that: the "modTimeIgnoreThreshold" in > FileInputDStream::findNewFiles is set to a time closed to system time (Spark > Streaming Clock time), so the files old than this time are ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6061) File source dstream can not include the old file which timestamp is before the system time
[ https://issues.apache.org/jira/browse/SPARK-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348093#comment-14348093 ] Jack Hu commented on SPARK-6061: [~srowen] The issue is: I want to process the old files in file dstream, but the old files will be ignored when set the {{newFilesOnly}} to {{false}} > File source dstream can not include the old file which timestamp is before > the system time > -- > > Key: SPARK-6061 > URL: https://issues.apache.org/jira/browse/SPARK-6061 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.1 >Reporter: Jack Hu > Labels: FileSourceDStream, OlderFiles, Streaming > Original Estimate: 1m > Remaining Estimate: 1m > > The file source dstream (StreamContext.fileStream) has a properties named > "newFilesOnly" to include the old files, it worked fine with 1.1.0, and > broken at 1.2.1, the older files always be ignored no mattern what value is > set. > Here is the simple reproduce code: > https://gist.github.com/jhu-chang/1ee5b0788c7479414eeb > The reason is that: the "modTimeIgnoreThreshold" in > FileInputDStream::findNewFiles is set to a time closed to system time (Spark > Streaming Clock time), so the files old than this time are ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6061) File source dstream can not include the old file which timestamp is before the system time
Jack Hu created SPARK-6061: -- Summary: File source dstream can not include the old file which timestamp is before the system time Key: SPARK-6061 URL: https://issues.apache.org/jira/browse/SPARK-6061 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.2.1 Reporter: Jack Hu The file source dstream (StreamContext.fileStream) has a properties named "newFilesOnly" to include the old files, it worked fine with 1.1.0, and broken at 1.2.1, the older files always be ignored no mattern what value is set. Here is the simple reproduce code: https://gist.github.com/jhu-chang/1ee5b0788c7479414eeb The reason is that: the "modTimeIgnoreThreshold" in FileInputDStream::findNewFiles is set to a time closed to system time (Spark Streaming Clock time), so the files old than this time are ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3276) Provide a API to specify whether the old files need to be ignored in file input text DStream
[ https://issues.apache.org/jira/browse/SPARK-3276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14283662#comment-14283662 ] Jack Hu commented on SPARK-3276: With some cases, the old files (older than current spark system time) are needed: if you have a fixed list in hdfs you want to correlate to the input stream, then you need to load it from the file system. As the newFilesOnly options, it breaks on spark 1.2 (It works on 1.1). > Provide a API to specify whether the old files need to be ignored in file > input text DStream > > > Key: SPARK-3276 > URL: https://issues.apache.org/jira/browse/SPARK-3276 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Jack Hu >Priority: Minor > > Currently, only one API called textFileStream in StreamingContext to specify > the text file dstream, which ignores the old files always. On some times, the > old files is still useful. > Need a API to let user choose whether the old files need to be ingored or not > . > The API currently in StreamingContext: > def textFileStream(directory: String): DStream[String] = { > fileStream[LongWritable, Text, > TextInputFormat](directory).map(_._2.toString) > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3276) Provide a API to specify whether the old files need to be ignored in file input text DStream
Jack Hu created SPARK-3276: -- Summary: Provide a API to specify whether the old files need to be ignored in file input text DStream Key: SPARK-3276 URL: https://issues.apache.org/jira/browse/SPARK-3276 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.0.2 Reporter: Jack Hu Currently, only one API called textFileStream in StreamingContext to specify the text file dstream, which ignores the old files always. On some times, the old files is still useful. Need a API to let user choose whether the old files need to be ingored or not . The API currently in StreamingContext: def textFileStream(directory: String): DStream[String] = { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3275) Socket receiver can not recover when the socket server restarted
Jack Hu created SPARK-3275: -- Summary: Socket receiver can not recover when the socket server restarted Key: SPARK-3275 URL: https://issues.apache.org/jira/browse/SPARK-3275 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.0.2 Reporter: Jack Hu To reproduce this issue: 1. create a application with a socket dstream 2. start the socket server and start the application 3. restart the socket server 4. the socket dstream will fail to reconnect (it will close the connection after a successful connect) The main issue should be the status in SocketReceiver and ReceiverSupervisor is incorrect after the reconnect: In SocketReceiver ::receive() the while loop will never be entered after reconnect since the isStopped will returns true: val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } logInfo("Stopped receiving") restart("Retrying connecting to " + host + ":" + port) That is caused by the status flag "receiverState" in ReceiverSupervisor will be set to Stopped when the connection losses, but it is reset after the call of Receiver start method: def startReceiver(): Unit = synchronized { try { logInfo("Starting receiver") receiver.onStart() logInfo("Called receiver onStart") onReceiverStart() receiverState = Started } catch { case t: Throwable => stop("Error starting receiver " + streamId, Some(t)) } } -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3274) Spark Streaming Java API reports java.lang.ClassCastException when calling collectAsMap on JavaPairDStream
[ https://issues.apache.org/jira/browse/SPARK-3274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Hu updated SPARK-3274: --- Description: Reproduce code: scontext .socketTextStream("localhost", 1) .mapToPair(new PairFunction(){ public Tuple2 call(String arg0) throws Exception { return new Tuple2("1", arg0); } }) .foreachRDD(new Function2, Time, Void>() { public Void call(JavaPairRDD v1, Time v2) throws Exception { System.out.println(v2.toString() + ": " + v1.collectAsMap().toString()); return null; } }); Exception: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tupl e2; at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.s cala:447) at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala: 464) at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:90) at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:88) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR DD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR DD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mc V$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo rEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo rEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS was: Reproduce code: scontext .socketTextStream("localhost", 1) .mapToPair(new PairFunction(){ public Tuple2 call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2("1", arg0); } }) .foreachRDD(new Function2, Time, Void>() { public Void call(JavaPairRDD v1, Time v2) throws Exception { System.out.println(v2.toString() + ": " + v1.collectAsMap().toString()); return null; } }); Exception: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tupl e2; at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.s cala:447) at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala: 464) at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:90) at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:88) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR DD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR DD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mc V$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo rEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo rEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS > Spark Streaming Java API reports java.lang.ClassCastException when calling > collectAsMap on JavaPairDStream > -- > > Key: SPARK-3274 > URL: https://issues.apache.org/jira/browse/SPARK-3274 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.0.2 >Reporter: Jack Hu > > Reproduce code: > scontext > .socketTextStream("localhost", 1) > .mapToPair(new PairFunction(){ > public Tuple2 call(String arg0) > throws Exception { > return new Tuple2("1", arg0); > } > }) > .foreachRDD(new Function2, Time, > Void>() { > public Void call(JavaPairRDD v1, Time > v2) throws Exception { >
[jira] [Created] (SPARK-3274) Spark Streaming Java API reports java.lang.ClassCastException when calling collectAsMap on JavaPairDStream
Jack Hu created SPARK-3274: -- Summary: Spark Streaming Java API reports java.lang.ClassCastException when calling collectAsMap on JavaPairDStream Key: SPARK-3274 URL: https://issues.apache.org/jira/browse/SPARK-3274 Project: Spark Issue Type: Bug Components: Java API Affects Versions: 1.0.2 Reporter: Jack Hu Reproduce code: scontext .socketTextStream("localhost", 1) .mapToPair(new PairFunction(){ public Tuple2 call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2("1", arg0); } }) .foreachRDD(new Function2, Time, Void>() { public Void call(JavaPairRDD v1, Time v2) throws Exception { System.out.println(v2.toString() + ": " + v1.collectAsMap().toString()); return null; } }); Exception: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tupl e2; at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.s cala:447) at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala: 464) at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:90) at tuk.usecase.failedcall.FailedCall$1.call(FailedCall.java:88) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR DD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachR DD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mc V$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo rEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(Fo rEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobS -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org