[jira] [Commented] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval
[ https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15646665#comment-15646665 ] Mario Briggs commented on SPARK-16545: -- [~lwlin] I agree with the PR discussion. I am not terribly sure what the value of the 'Resolution' state should be when closing... 'Later' for e.g. to indicate this is being fixed elsehere etc > Structured Streaming : foreachSink creates the Physical Plan multiple times > per TriggerInterval > > > Key: SPARK-16545 > URL: https://issues.apache.org/jira/browse/SPARK-16545 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0 >Reporter: Mario Briggs > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17630) jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available for akka
[ https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587664#comment-15587664 ] Mario Briggs commented on SPARK-17630: -- [~zsxwing] thanks much. any pointers on how/where to add code or something existing in the code base to look. I can then try a PR > jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available > for akka > > > Key: SPARK-17630 > URL: https://issues.apache.org/jira/browse/SPARK-17630 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Mario Briggs > Attachments: SecondCodePath.txt, firstCodepath.txt > > > Hi, > I have 2 code-paths from my app that result in a jvm OOM. > In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts > down the JVM, so that the caller (py4J) get notified with proper stack trace. > Attached stack-trace file (firstCodepath.txt) > In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the > JVM, so the caller does not get notified. > Attached stack-trace file (SecondCodepath.txt) > Is it possible to have an jvm exit handle for the rpc. netty path? > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17917) Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event
[ https://issues.apache.org/jira/browse/SPARK-17917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574639#comment-15574639 ] Mario Briggs edited comment on SPARK-17917 at 10/18/16 6:49 PM: >> I don't have a strong feeling on this partly because I'm not sure what the action then is – kill the job? << Here is an example - Lets say i am using a notebook and kicked off some spark actions that dont' get executors because user/org/group quota's of executors have been exhausted. These events can be used by the notebook implementor to then surface the issue to the user via a UI update on that cell itself, maybe even additionally query the user/org/group quota's, show which apps are using up the quota's etc and allow the user to take what action required (kill the other jobs, just wait on this job etc). Therefore not looking to define in anyway on the event, what the set of actions can be, since that would be very implementation specific. >> Maybe, I suppose it will be a little tricky to define what the event is here << Where you referring to the actual arguments of the event method. I can give a shot at defining and then look for feedback was (Author: mariobriggs): >> I don't have a strong feeling on this partly because I'm not sure what the action then is – kill the job? << Here is an example - Lets say i am using a notebook and kicked off some spark actions that dont' get executors because user/org/group quota's of executors have been exhausted. These events can be used by the notebook implementor to then surface the issue to the user via a UI update on that cell itself, maybe even additionally query the user/org/group quota's show which apps are using up the quota's etc and allow the user to take what action required (kill the other jobs, just wait on this job etc). Therefore not looking to define in anyway on the event, what the set of actions can be, since that would be very implementation specific. >> Maybe, I suppose it will be a little tricky to define what the event is here << Where you referring to the actual arguments of the event method. I can give a shot at defining and then look for feedback > Convert 'Initial job has not accepted any resources..' logWarning to a > SparkListener event > -- > > Key: SPARK-17917 > URL: https://issues.apache.org/jira/browse/SPARK-17917 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Mario Briggs >Priority: Minor > > When supporting Spark on a multi-tenant shared large cluster with quotas per > tenant, often a submitted taskSet might not get executors because quotas have > been exhausted (or) resources unavailable. In these situations, firing a > SparkListener event instead of just logging the issue (as done currently at > https://github.com/apache/spark/blob/9216901d52c9c763bfb908013587dcf5e781f15b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L192), > would give applications/listeners an opportunity to handle this more > appropriately as needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17917) Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event
[ https://issues.apache.org/jira/browse/SPARK-17917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574639#comment-15574639 ] Mario Briggs commented on SPARK-17917: -- >> I don't have a strong feeling on this partly because I'm not sure what the action then is – kill the job? << Here is an example - Lets say i am using a notebook and kicked off some spark actions that dont' get executors because user/org/group quota's of executors have been exhausted. These events can be used by the notebook implementor to then surface the issue to the user via a UI update on that cell itself, maybe even additionally query the user/org/group quota's show which apps are using up the quota's etc and allow the user to take what action required (kill the other jobs, just wait on this job etc). Therefore not looking to define in anyway on the event, what the set of actions can be, since that would be very implementation specific. >> Maybe, I suppose it will be a little tricky to define what the event is here << Where you referring to the actual arguments of the event method. I can give a shot at defining and then look for feedback > Convert 'Initial job has not accepted any resources..' logWarning to a > SparkListener event > -- > > Key: SPARK-17917 > URL: https://issues.apache.org/jira/browse/SPARK-17917 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Mario Briggs >Priority: Minor > > When supporting Spark on a multi-tenant shared large cluster with quotas per > tenant, often a submitted taskSet might not get executors because quotas have > been exhausted (or) resources unavailable. In these situations, firing a > SparkListener event instead of just logging the issue (as done currently at > https://github.com/apache/spark/blob/9216901d52c9c763bfb908013587dcf5e781f15b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L192), > would give applications/listeners an opportunity to handle this more > appropriately as needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17917) Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event
[ https://issues.apache.org/jira/browse/SPARK-17917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572795#comment-15572795 ] Mario Briggs commented on SPARK-17917: -- would appreciate if the spark devs comment in whether they see this as a bad idea for some reason. I basically see add 2 events to SparkListener like onTaskStarved() and OnTaskUnStarved() - the latter fires only if onTaskStarved() fired in the first place for a taskSet > Convert 'Initial job has not accepted any resources..' logWarning to a > SparkListener event > -- > > Key: SPARK-17917 > URL: https://issues.apache.org/jira/browse/SPARK-17917 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Mario Briggs > > When supporting Spark on a multi-tenant shared large cluster with quotas per > tenant, often a submitted taskSet might not get executors because quotas have > been exhausted (or) resources unavailable. In these situations, firing a > SparkListener event instead of just logging the issue (as done currently at > https://github.com/apache/spark/blob/9216901d52c9c763bfb908013587dcf5e781f15b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L192), > would give applications/listeners an opportunity to handle this more > appropriately as needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17917) Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event
Mario Briggs created SPARK-17917: Summary: Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event Key: SPARK-17917 URL: https://issues.apache.org/jira/browse/SPARK-17917 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Mario Briggs When supporting Spark on a multi-tenant shared large cluster with quotas per tenant, often a submitted taskSet might not get executors because quotas have been exhausted (or) resources unavailable. In these situations, firing a SparkListener event instead of just logging the issue (as done currently at https://github.com/apache/spark/blob/9216901d52c9c763bfb908013587dcf5e781f15b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L192), would give applications/listeners an opportunity to handle this more appropriately as needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available for akka
[ https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mario Briggs updated SPARK-17630: - Summary: jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available for akka (was: jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka) > jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available > for akka > > > Key: SPARK-17630 > URL: https://issues.apache.org/jira/browse/SPARK-17630 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Mario Briggs > Attachments: SecondCodePath.txt, firstCodepath.txt > > > Hi, > I have 2 code-paths from my app that result in a jvm OOM. > In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts > down the JVM, so that the caller (py4J) get notified with proper stack trace. > Attached stack-trace file (firstCodepath.txt) > In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the > JVM, so the caller does not get notified. > Attached stack-trace file (SecondCodepath.txt) > Is it possible to have an jvm exit handle for the rpc. netty path? > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
[ https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mario Briggs updated SPARK-17630: - Description: Hi, I have 2 code-paths from my app that result in a jvm OOM. In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down the JVM, so that the caller (py4J) get notified with proper stack trace. Attached stack-trace file (firstCodepath.txt) In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the JVM, so the caller does not get notified. Attached stack-trace file (SecondCodepath.txt) Is it possible to have an jvm exit handle for the rpc. netty path? was: Hi, I have 2 code-paths from my app that result in a jvm OOM. In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down the JVM, so that the caller (py4J) get notified with proper stack trace In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the JVM, so the caller does not get notified. Is it possible to have an jvm exit handle for the rpc. netty path? First code path trace file - > jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka > > > Key: SPARK-17630 > URL: https://issues.apache.org/jira/browse/SPARK-17630 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Mario Briggs > Attachments: SecondCodePath.txt, firstCodepath.txt > > > Hi, > I have 2 code-paths from my app that result in a jvm OOM. > In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts > down the JVM, so that the caller (py4J) get notified with proper stack trace. > Attached stack-trace file (firstCodepath.txt) > In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the > JVM, so the caller does not get notified. > Attached stack-trace file (SecondCodepath.txt) > Is it possible to have an jvm exit handle for the rpc. netty path? > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
[ https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mario Briggs updated SPARK-17630: - Attachment: SecondCodePath.txt > jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka > > > Key: SPARK-17630 > URL: https://issues.apache.org/jira/browse/SPARK-17630 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Mario Briggs > Attachments: SecondCodePath.txt, firstCodepath.txt > > > Hi, > I have 2 code-paths from my app that result in a jvm OOM. > In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts > down the JVM, so that the caller (py4J) get notified with proper stack trace > In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the > JVM, so the caller does not get notified. > Is it possible to have an jvm exit handle for the rpc. netty path? > First code path trace file - > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
[ https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mario Briggs updated SPARK-17630: - Attachment: firstCodepath.txt > jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka > > > Key: SPARK-17630 > URL: https://issues.apache.org/jira/browse/SPARK-17630 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Mario Briggs > Attachments: firstCodepath.txt > > > Hi, > I have 2 code-paths from my app that result in a jvm OOM. > In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts > down the JVM, so that the caller (py4J) get notified with proper stack trace > In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the > JVM, so the caller does not get notified. > Is it possible to have an jvm exit handle for the rpc. netty path? > First code path trace file - > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
[ https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mario Briggs updated SPARK-17630: - Description: Hi, I have 2 code-paths from my app that result in a jvm OOM. In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down the JVM, so that the caller (py4J) get notified with proper stack trace In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the JVM, so the caller does not get notified. Is it possible to have an jvm exit handle for the rpc. netty path? First code path trace file - was: Hi, I have 2 code-paths from my app that result in a jvm OOM. In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down the JVM, so that the caller (py4J) get notified with proper stack trace In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the JVM, so the caller does not get notified. Is it possible to have an jvm exit handle for the rpc. netty path? First code path trace > jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka > > > Key: SPARK-17630 > URL: https://issues.apache.org/jira/browse/SPARK-17630 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Mario Briggs > > Hi, > I have 2 code-paths from my app that result in a jvm OOM. > In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts > down the JVM, so that the caller (py4J) get notified with proper stack trace > In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the > JVM, so the caller does not get notified. > Is it possible to have an jvm exit handle for the rpc. netty path? > First code path trace file - > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
Mario Briggs created SPARK-17630: Summary: jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka Key: SPARK-17630 URL: https://issues.apache.org/jira/browse/SPARK-17630 Project: Spark Issue Type: Question Components: Spark Core Affects Versions: 1.6.0 Reporter: Mario Briggs Hi, I have 2 code-paths from my app that result in a jvm OOM. In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down the JVM, so that the caller (py4J) get notified with proper stack trace In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the JVM, so the caller does not get notified. Is it possible to have an jvm exit handle for the rpc. netty path? First code path trace -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval
[ https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378811#comment-15378811 ] Mario Briggs edited comment on SPARK-16545 at 7/15/16 3:57 AM: --- thanks. I looked into it too and i was not getting a fix that was satisfying to myself. The problem seems to be that Dataset assumes it has a QueryExecution with the Physical Plan (which is true in the batch case), since most of the Listener/metrics gathering functions want to dump this info , whereas in streaming we want only the 'inner' IncrementalExecution to produce the PhysicalPlan. I will submit what i have tried to do as well to ease the discussion points. was (Author: mariobriggs): thanks. I looked into it too and i was not getting a fix that was satisfying to myself. The problem seems to be that Dataset assumes it has a QueryExecution with the Physical Plan (which is true in the batch case), since most of the Listener/metrics gathering functions want to dump this info , whereas in streaming we want only the 'inner' IncrementalExecution to produce the PhysicalPlan. I will submit have i tried to do as well to ease the discussion points. > Structured Streaming : foreachSink creates the Physical Plan multiple times > per TriggerInterval > > > Key: SPARK-16545 > URL: https://issues.apache.org/jira/browse/SPARK-16545 > Project: Spark > Issue Type: Bug > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: Mario Briggs > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval
[ https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378811#comment-15378811 ] Mario Briggs commented on SPARK-16545: -- thanks. I looked into it too and i was not getting a fix that was satisfying to myself. The problem seems to be that Dataset assumes it has a QueryExecution with the Physical Plan (which is true in the batch case), since most of the Listener/metrics gathering functions want to dump this info , whereas in streaming we want only the 'inner' IncrementalExecution to produce the PhysicalPlan. I will submit have i tried to do as well to ease the discussion points. > Structured Streaming : foreachSink creates the Physical Plan multiple times > per TriggerInterval > > > Key: SPARK-16545 > URL: https://issues.apache.org/jira/browse/SPARK-16545 > Project: Spark > Issue Type: Bug > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: Mario Briggs > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval
[ https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376758#comment-15376758 ] Mario Briggs edited comment on SPARK-16545 at 7/14/16 11:01 AM: While looking at the performance of Structured streaming, found some excessive time being spent in the driver. Further looking into this, found the time spent in multiple (3 to be exact) initialisations of [QueryExecution.executedPlan|https://github.com/mariobriggs/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L85] due to multiple instances of QueryExecution created in the forEachSink.addBatch. Creation of physical plan involves more time and hence shouldn't be done more than once per TriggerInterval was (Author: mariobriggs): While looking at the performance of Structured streaming, found some excessive time being spent in the driver. Further looking into this, found the time spent in multiple (3 to be exact) initialisations of [QueryExecution.executedPlan|https://github.com/mariobriggs/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L85] due to multiple instances of QueryExecution created in the forEachSink.addBatch. Creation of physical plan involves more time and hence shouldn't be done more than once > Structured Streaming : foreachSink creates the Physical Plan multiple times > per TriggerInterval > > > Key: SPARK-16545 > URL: https://issues.apache.org/jira/browse/SPARK-16545 > Project: Spark > Issue Type: Bug > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: Mario Briggs > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval
[ https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376758#comment-15376758 ] Mario Briggs edited comment on SPARK-16545 at 7/14/16 10:53 AM: While looking at the performance of Structured streaming, found some excessive time being spent in the driver. Further looking into this, found the time spent in multiple (3 to be exact) initialisations of [QueryExecution.executedPlan|https://github.com/mariobriggs/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L85] due to multiple instances of QueryExecution created in the forEachSink.addBatch. Creation of physical plan involves more time and hence shouldn't be done more than once was (Author: mariobriggs): While looking at the performance of Structured streaming, found some excessive time being spent in the driver. Further looking into this, found the time spent in multiple (3 to be exact) initialisations of QueryExecution.executedPlan due to multiple instances of QueryExecution created in the forEachSink.addBatch. Creation of physical plan involves more time and hence shouldn't be done more than once > Structured Streaming : foreachSink creates the Physical Plan multiple times > per TriggerInterval > > > Key: SPARK-16545 > URL: https://issues.apache.org/jira/browse/SPARK-16545 > Project: Spark > Issue Type: Bug > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: Mario Briggs > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval
[ https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376758#comment-15376758 ] Mario Briggs commented on SPARK-16545: -- While looking at the performance of Structured streaming, found some excessive time being spent in the driver. Further looking into this, found the time spent in multiple (3 to be exact) initialisations of QueryExecution.executedPlan due to multiple instances of QueryExecution created in the forEachSink.addBatch. Creation of physical plan involves more time and hence shouldn't be done more than once > Structured Streaming : foreachSink creates the Physical Plan multiple times > per TriggerInterval > > > Key: SPARK-16545 > URL: https://issues.apache.org/jira/browse/SPARK-16545 > Project: Spark > Issue Type: Bug > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: Mario Briggs > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval
Mario Briggs created SPARK-16545: Summary: Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval Key: SPARK-16545 URL: https://issues.apache.org/jira/browse/SPARK-16545 Project: Spark Issue Type: Bug Components: SQL, Streaming Affects Versions: 2.0.0 Reporter: Mario Briggs Fix For: 2.0.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15089) kafka-spark consumer with SSL problem
[ https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15270203#comment-15270203 ] Mario Briggs commented on SPARK-15089: -- Kafka supports SSL only with the 0.9x Kakfa client API's. The Spark-Kafka connector you are exercising (KafkaUtils.createDirectStream) uses the 0.8 Kafka API's that does not support SSL. Your standalone kafka consumer program is using the 0.9 kafka client. If you are willing to play with the edge, then this PR has the code that uses the kafka 0.9 client API - https://github.com/apache/spark/pull/11863 and the JIRA is ttps://issues.apache.org/jira/browse/SPARK-12177 > kafka-spark consumer with SSL problem > - > > Key: SPARK-15089 > URL: https://issues.apache.org/jira/browse/SPARK-15089 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.6.1 >Reporter: JasonChang > > I am not sure spark streaming support SSL > I tried to add params to kafkaParams, but it not work > {code} > JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new > Duration(1)); > Set topicmap = new HashSet(); > topicmap.add(kafkaTopic); > MapkafkaParams = new HashMap (); > kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url); > kafkaParams.put("security.protocol", "SSL"); > kafkaParams.put("ssl.keystore.type", "JKS"); > kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks"); > kafkaParams.put("ssl.keystore.password ", "password"); > kafkaParams.put("ssl.key.password", "password"); > kafkaParams.put("ssl.truststore.type", "JKS"); > kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks"); > kafkaParams.put("ssl.truststore.password", "password"); > kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic); > JavaPairInputDStream stream = > KafkaUtils.createDirectStream(jsc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicmap > ); > JavaDStream lines = stream.map(new Function , > String>() { > public String call(Tuple2 tuple2) { > return tuple2._2(); > } > }); > {code} > {code} > Exception in thread "main" org.apache.spark.SparkException: > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at scala.util.Either.fold(Either.scala:97) > at > org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) > at > org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) > at > org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15259530#comment-15259530 ] Mario Briggs commented on SPARK-12177: -- what's the thinking on this one with respect to Structured Streaming? Is the thinking that Kafka0.9 be supported with the older Dstream API (KafkaUtils.createDirectStream) AND the newer structured streaming way of doing things ? or kafka 0.9 will only be supported only with the new structured streaming way? I am going to assume only [~tdas] or [~rxin] have an good idea on Structured streaming (sorry [~mgrover] , [~c...@koeninger.org] if i have insulted you :-) ), so appreciate if they can chime in. For my side i am assuming that 0.9 will be supported older DStream API as well. [~mgrover] howz it going merging cody's changes and making a new subproject. > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14745) CEP support in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-14745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15251353#comment-15251353 ] Mario Briggs commented on SPARK-14745: -- Hi Sean, thanks for 'fixing' the target. To answer you question, right now the code contains a first cut minimal implementation that i am hoping others can comment on if there are more efficient approaches to implement (a). Then there is the task of seeing how to fit in with structured streaming, now that quite a bit of info is available on this topic (b). Both b and a would led us to be in a better position to answer your question. For example with b, would that be a UDF or not and if so what that means for the implementation and users w.r.t to perf and/or ease of use. Also in the initial structured streaming proposal (https://issues.apache.org/jira/secure/attachment/12793419/StreamingDataFrameProposal.pdf) the 'Event- trigger' section kind of left the impression to me that it was aiming at pattern matching ( I am not yet up-todate on the new one), so curious on that. Finally if 'a' leads to some more options, those might lead to looking at some changes that make sense. Sorry for the slightly long-winded answer, but i hope it gives an idea of current options. > CEP support in Spark Streaming > -- > > Key: SPARK-14745 > URL: https://issues.apache.org/jira/browse/SPARK-14745 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Mario Briggs > Attachments: SparkStreamingCEP.pdf > > > Complex Event Processing is a often used feature in Streaming applications. > Spark Streaming current does not have a DSL/API for it. This JIRA is about > how/what can we add in Spark Streaming to support CEP out of the box -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14597) Streaming Listener timing metrics should include time spent in JobGenerator's graph.generateJobs
[ https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15251285#comment-15251285 ] Mario Briggs commented on SPARK-14597: -- Yeah i think it is neat. Good to me. Thanks > Streaming Listener timing metrics should include time spent in JobGenerator's > graph.generateJobs > > > Key: SPARK-14597 > URL: https://issues.apache.org/jira/browse/SPARK-14597 > Project: Spark > Issue Type: Improvement > Components: Spark Core, Streaming >Affects Versions: 1.6.1, 2.0.0 >Reporter: Sachin Aggarwal >Priority: Minor > > While looking to tune our streaming application, the piece of info we were > looking for was actual processing time per batch. The > StreamingListener.onBatchCompleted event provides a BatchInfo object that > provided this information. It provides the following data > - processingDelay > - schedulingDelay > - totalDelay > - Submission Time > The above are essentially calculated from the streaming JobScheduler > clocking the processingStartTime and processingEndTime for each JobSet. > Another metric available is submissionTime which is when a Jobset was put on > the Streaming Scheduler's Queue. > > So we took processing delay as our actual processing time per batch. However > to maintain a stable streaming application, we found that the our batch > interval had to be a little less than DOUBLE of the processingDelay metric > reported. (We are using a DirectKafkaInputStream). On digging further, we > found that processingDelay is only clocking time spent in the ForEachRDD > closure of the Streaming application and that JobGenerator's > graph.generateJobs > (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248) > method takes a significant more amount of time. > Thus a true reflection of processing time is > a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay) > b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay) > c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay > metric) > d - Time spent in Jobset's job run (existing processingDelay metric) > > Additionally a JobGeneratorQueue delay (#a) could be due to either > graph.generateJobs taking longer than batchInterval or other JobGenerator > events like checkpointing adding up time. Thus it would be beneficial to > report time taken by the checkpointing Job as well -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14745) CEP support in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-14745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249261#comment-15249261 ] Mario Briggs edited comment on SPARK-14745 at 4/20/16 5:18 AM: --- Document with what is CEP, Examples, Features and possible API was (Author: mariobriggs): Examples, Features and possible API > CEP support in Spark Streaming > -- > > Key: SPARK-14745 > URL: https://issues.apache.org/jira/browse/SPARK-14745 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Mario Briggs > Attachments: SparkStreamingCEP.pdf > > > Complex Event Processing is a often used feature in Streaming applications. > Spark Streaming current does not have a DSL/API for it. This JIRA is about > how/what can we add in Spark Streaming to support CEP out of the box -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-14745) CEP support in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-14745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mario Briggs updated SPARK-14745: - Attachment: SparkStreamingCEP.pdf Examples, Features and possible API > CEP support in Spark Streaming > -- > > Key: SPARK-14745 > URL: https://issues.apache.org/jira/browse/SPARK-14745 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Mario Briggs > Attachments: SparkStreamingCEP.pdf > > > Complex Event Processing is a often used feature in Streaming applications. > Spark Streaming current does not have a DSL/API for it. This JIRA is about > how/what can we add in Spark Streaming to support CEP out of the box -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14745) CEP support in Spark Streaming
Mario Briggs created SPARK-14745: Summary: CEP support in Spark Streaming Key: SPARK-14745 URL: https://issues.apache.org/jira/browse/SPARK-14745 Project: Spark Issue Type: New Feature Components: Streaming Reporter: Mario Briggs Complex Event Processing is a often used feature in Streaming applications. Spark Streaming current does not have a DSL/API for it. This JIRA is about how/what can we add in Spark Streaming to support CEP out of the box -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14597) Streaming Listener timing metrics should include time spent in JobGenerator's graph.generateJobs
[ https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15241358#comment-15241358 ] Mario Briggs edited comment on SPARK-14597 at 4/14/16 3:32 PM: --- I think there is an opportunity to merge both your approaches above. Let me explain, taking how the onOutputOperationStarted/onOutputOperationCompleted is already implemented. So rather than providing a single time metric and a single start/complete event that encompasses the generateJob for all OutputStreams, you could provide a start/complete event for each individual outputstream generateJob and onBatchComplete provide the metric for generateJob of all OutputStreams. This way a user can also figure out if a individual outputstream is the culprit. The above would require 2 additional things - pass an eventLoop to DStreamGraph.generateJobs() method. This eventLoop should not be the existing eventLoop instance in JobGenerator, but rather another new eventLoop instance (say genJobEventLoop) in JobGenerator. This is because the existing JobGenerator.eventLoop instance's thread is used to actually drive the Job Generation and making that thread do additional tasks will increase latency in Streaming. This new 'genJobEventLoop' will handle a GenJobStarted and GenJobCompleted event and use those events to fire corresponding events to the ListenerBus and gather the generateJob metric for all outputStreams to set it in the JobSet was (Author: mariobriggs): I think there is an opportunity to merge both your approaches above. Let me explain, taking how the onOutputOperationStarted/onOutputOperationCompleted is already implemented. So rather than providing a single time metric and a single start/complete event that encompasses the generateJob for all OutputStreams, you could provide a start/complete event for each individual outputstream generateJob and onBatchComplete provide the metric for generateJob of all OutputStreams. This way a user can also figure out if a individual outputstream is the culprit. The above would require 2 additional things - pass an eventLoop to DStreamGraph.generateJobs() method. This eventLoop should not be the existing eventLoop instance in JobGenerator, but rather another new eventLoop instance (say genJobEventLoop) in JobGenerator. This is because the existing JobGenerator.eventLoop instance's thread is used to actually drive the Job Generation and making that thread do additional tasks will increase latency in Streaming. This new 'genJobEventLoop' will handle a GenJobStarted and GenJobCompleted event and use those events to fire corresponding events to the ListenerBus and gather the generateJob metric for all outputStreams and set it in the JobSet > Streaming Listener timing metrics should include time spent in JobGenerator's > graph.generateJobs > > > Key: SPARK-14597 > URL: https://issues.apache.org/jira/browse/SPARK-14597 > Project: Spark > Issue Type: Improvement > Components: Spark Core, Streaming >Affects Versions: 1.6.1, 2.0.0 >Reporter: Sachin Aggarwal >Priority: Minor > > While looking to tune our streaming application, the piece of info we were > looking for was actual processing time per batch. The > StreamingListener.onBatchCompleted event provides a BatchInfo object that > provided this information. It provides the following data > - processingDelay > - schedulingDelay > - totalDelay > - Submission Time > The above are essentially calculated from the streaming JobScheduler > clocking the processingStartTime and processingEndTime for each JobSet. > Another metric available is submissionTime which is when a Jobset was put on > the Streaming Scheduler's Queue. > > So we took processing delay as our actual processing time per batch. However > to maintain a stable streaming application, we found that the our batch > interval had to be a little less than DOUBLE of the processingDelay metric > reported. (We are using a DirectKafkaInputStream). On digging further, we > found that processingDelay is only clocking time spent in the ForEachRDD > closure of the Streaming application and that JobGenerator's > graph.generateJobs > (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248) > method takes a significant more amount of time. > Thus a true reflection of processing time is > a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay) > b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay) > c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay > metric) > d - Time spent in Jobset's job run (existing processingDelay
[jira] [Commented] (SPARK-14597) Streaming Listener timing metrics should include time spent in JobGenerator's graph.generateJobs
[ https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15241358#comment-15241358 ] Mario Briggs commented on SPARK-14597: -- I think there is an opportunity to merge both your approaches above. Let me explain, taking how the onOutputOperationStarted/onOutputOperationCompleted is already implemented. So rather than providing a single time metric and a single start/complete event that encompasses the generateJob for all OutputStreams, you could provide a start/complete event for each individual outputstream generateJob and onBatchComplete provide the metric for generateJob of all OutputStreams. This way a user can also figure out if a individual outputstream is the culprit. The above would require 2 additional things - pass an eventLoop to DStreamGraph.generateJobs() method. This eventLoop should not be the existing eventLoop instance in JobGenerator, but rather another new eventLoop instance (say genJobEventLoop) in JobGenerator. This is because the existing JobGenerator.eventLoop instance's thread is used to actually drive the Job Generation and making that thread do additional tasks will increase latency in Streaming. This new 'genJobEventLoop' will handle a GenJobStarted and GenJobCompleted event and use those events to fire corresponding events to the ListenerBus and gather the generateJob metric for all outputStreams and set it in the JobSet > Streaming Listener timing metrics should include time spent in JobGenerator's > graph.generateJobs > > > Key: SPARK-14597 > URL: https://issues.apache.org/jira/browse/SPARK-14597 > Project: Spark > Issue Type: Improvement > Components: Spark Core, Streaming >Affects Versions: 1.6.1, 2.0.0 >Reporter: Sachin Aggarwal >Priority: Minor > > While looking to tune our streaming application, the piece of info we were > looking for was actual processing time per batch. The > StreamingListener.onBatchCompleted event provides a BatchInfo object that > provided this information. It provides the following data > - processingDelay > - schedulingDelay > - totalDelay > - Submission Time > The above are essentially calculated from the streaming JobScheduler > clocking the processingStartTime and processingEndTime for each JobSet. > Another metric available is submissionTime which is when a Jobset was put on > the Streaming Scheduler's Queue. > > So we took processing delay as our actual processing time per batch. However > to maintain a stable streaming application, we found that the our batch > interval had to be a little less than DOUBLE of the processingDelay metric > reported. (We are using a DirectKafkaInputStream). On digging further, we > found that processingDelay is only clocking time spent in the ForEachRDD > closure of the Streaming application and that JobGenerator's > graph.generateJobs > (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248) > method takes a significant more amount of time. > Thus a true reflection of processing time is > a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay) > b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay) > c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay > metric) > d - Time spent in Jobset's job run (existing processingDelay metric) > > Additionally a JobGeneratorQueue delay (#a) could be due to either > graph.generateJobs taking longer than batchInterval or other JobGenerator > events like checkpointing adding up time. Thus it would be beneficial to > report time taken by the checkpointing Job as well -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13650) Usage of the window() function on DStream
[ https://issues.apache.org/jira/browse/SPARK-13650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15179347#comment-15179347 ] Mario Briggs commented on SPARK-13650: -- Running locally on my MAC, here is the output {quote} [Stage 0:> (0 + 0) / 2]16/03/04 10:38:18 INFO VerifiableProperties: Verifying properties 16/03/04 10:38:18 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 16/03/04 10:38:18 INFO VerifiableProperties: Property group.id is overridden to 16/03/04 10:38:18 INFO VerifiableProperties: Property zookeeper.connect is overridden to --- Time: 1457068098000 ms --- 10 [Stage 6:===> (2 + 0) / 3] [Stage 6:===> (2 + 0) / 3] {quote} The '10' is the output from the first batch interval at time '1457068098000 ms' . Thereafter, the only output for the next ~2 minutes is the 'stage 6'. A ^C fails to stop the app and need to do a 'kill -9 pid' > Usage of the window() function on DStream > - > > Key: SPARK-13650 > URL: https://issues.apache.org/jira/browse/SPARK-13650 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2, 1.6.0, 2.0.0 >Reporter: Mario Briggs >Priority: Minor > > Is there some guidance of the usage of the Window() function on DStream. Here > is my academic use-case for which it fails. > Standard word count > val ssc = new StreamingContext(sparkConf, Seconds(6)) > val messages = KafkaUtils.createDirectStream(...) > val words = messages.map(_._2).flatMap(_.split(" ")) > val window = words.window(Seconds(12), Seconds(6)) > window.count().print() > For the first batch interval it gives the count and then it hangs (inside the > unionRDD) > I say the above use-case is academic since one can achieve similar > fuctionality by using instead the more compact API >words.countByWindow(Seconds(12), Seconds(6)) > which works fine. > Is the first approach above not the intended way of using the .window() API -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13650) Usage of the window() function on DStream
Mario Briggs created SPARK-13650: Summary: Usage of the window() function on DStream Key: SPARK-13650 URL: https://issues.apache.org/jira/browse/SPARK-13650 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.6.0, 1.5.2, 2.0.0 Reporter: Mario Briggs Priority: Minor Is there some guidance of the usage of the Window() function on DStream. Here is my academic use-case for which it fails. Standard word count val ssc = new StreamingContext(sparkConf, Seconds(6)) val messages = KafkaUtils.createDirectStream(...) val words = messages.map(_._2).flatMap(_.split(" ")) val window = words.window(Seconds(12), Seconds(6)) window.count().print() For the first batch interval it gives the count and then it hangs (inside the unionRDD) I say the above use-case is academic since one can achieve similar fuctionality by using instead the more compact API words.countByWindow(Seconds(12), Seconds(6)) which works fine. Is the first approach above not the intended way of using the .window() API -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13027) Add API for updateStateByKey to provide batch time as input
[ https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133643#comment-15133643 ] Mario Briggs commented on SPARK-13027: -- [~rameshaaditya117] sure, more than happy to review > Add API for updateStateByKey to provide batch time as input > --- > > Key: SPARK-13027 > URL: https://issues.apache.org/jira/browse/SPARK-13027 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Aaditya Ramesh > > The StateDStream currently does not provide the batch time as input to the > state update function. This is required in cases where the behavior depends > on the batch start time. > We (Conviva) have been patching it manually for the past several Spark > versions but we thought it might be useful for others as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13027) Add API for updateStateByKey to provide batch time as input
[ https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15132641#comment-15132641 ] Mario Briggs commented on SPARK-13027: -- [~rameshaaditya117] if you are tied up on something else, i could take a shot at it > Add API for updateStateByKey to provide batch time as input > --- > > Key: SPARK-13027 > URL: https://issues.apache.org/jira/browse/SPARK-13027 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Aaditya Ramesh > > The StateDStream currently does not provide the batch time as input to the > state update function. This is required in cases where the behavior depends > on the batch start time. > We (Conviva) have been patching it manually for the past several Spark > versions but we thought it might be useful for others as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12739) Details of batch in Streaming tab uses two Duration columns
[ https://issues.apache.org/jira/browse/SPARK-12739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15125974#comment-15125974 ] Mario Briggs commented on SPARK-12739: -- Maybe the 1st duration can be named 'Output Op Duration' and the 2nd named 'Job Duration' ? [~jlaskowski] if you are tied up with something else, can i give it a shot? > Details of batch in Streaming tab uses two Duration columns > --- > > Key: SPARK-12739 > URL: https://issues.apache.org/jira/browse/SPARK-12739 > Project: Spark > Issue Type: Bug > Components: Streaming, Web UI >Affects Versions: 2.0.0 >Reporter: Jacek Laskowski >Priority: Minor > Attachments: SPARK-12739.png > > > "Details of batch" screen in Streaming tab in web UI uses two Duration > columns. I think one should be "Processing Time" while the other "Job > Duration". > See the attachment. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13009) spark-streaming-twitter_2.10 does not make it possible to access the raw twitter json
[ https://issues.apache.org/jira/browse/SPARK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15125940#comment-15125940 ] Mario Briggs commented on SPARK-13009: -- Andrew, as you yourself noted, i think it is more appropriate that Twitter4J API add the method to retrieve the raw JSON, than Spark API go through hoops and furthermore this problem exists for all consumers of Twitter4J (nothing specific to Spark) > spark-streaming-twitter_2.10 does not make it possible to access the raw > twitter json > - > > Key: SPARK-13009 > URL: https://issues.apache.org/jira/browse/SPARK-13009 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Andrew Davidson >Priority: Blocker > Labels: twitter > > The Streaming-twitter package makes it easy for Java programmers to work with > twitter. The implementation returns the raw twitter data in JSON formate as a > twitter4J StatusJSONImpl object > JavaDStream tweets = TwitterUtils.createStream(ssc, twitterAuth); > The status class is different then the raw JSON. I.E. serializing the status > object will be the same as the original json. I have down stream systems that > can only process raw tweets not twitter4J Status objects. > Here is my bug/RFE request made to Twitter4J. > They asked I create a spark tracking issue. > On Thursday, January 21, 2016 at 6:27:25 PM UTC, Andy Davidson wrote: > Hi All > Quick problem summary: > My system uses the Status objects to do some analysis how ever I need to > store the raw JSON. There are other systems that process that data that are > not written in Java. > Currently we are serializing the Status Object. The JSON is going to break > down stream systems. > I am using the Apache Spark Streaming spark-streaming-twitter_2.10 > http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources > Request For Enhancement: > I imagine easy access to the raw JSON is a common requirement. Would it be > possible to add a member function to StatusJSONImpl getRawJson(). By default > the returned value would be null unless jsonStoreEnabled=True is set in the > config. > Alternative implementations: > > It should be possible to modify the spark-streaming-twitter_2.10 to provide > this support. The solutions is not very clean > It would required apache spark to define their own Status Pojo. The current > StatusJSONImpl class is marked final > The Wrapper is not going to work nicely with existing code. > spark-streaming-twitter_2.10 does not expose all of the twitter streaming > API so many developers are writing their implementations of > org.apache.park.streaming.twitter.TwitterInputDStream. This make maintenance > difficult. Its not easy to know when the spark implementation for twitter has > changed. > Code listing for > spark-1.6.0/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala > private[streaming] > class TwitterReceiver( > twitterAuth: Authorization, > filters: Seq[String], > storageLevel: StorageLevel > ) extends Receiver[Status](storageLevel) with Logging { > @volatile private var twitterStream: TwitterStream = _ > @volatile private var stopped = false > def onStart() { > try { > val newTwitterStream = new > TwitterStreamFactory().getInstance(twitterAuth) > newTwitterStream.addListener(new StatusListener { > def onStatus(status: Status): Unit = { > store(status) > } > Ref: > https://forum.processing.org/one/topic/saving-json-data-from-twitter4j.html > What do people think? > Kind regards > Andy > From: on behalf of Igor Brigadir > > Reply-To: > Date: Tuesday, January 19, 2016 at 5:55 AM > To: Twitter4J > Subject: Re: [Twitter4J] trouble writing unit test > Main issue is that the Json object is in the wrong json format. > eg: "createdAt": 1449775664000 should be "created_at": "Thu Dec 10 19:27:44 > + 2015", ... > It looks like the json you have was serialized from a java Status object, > which makes json objects different to what you get from the API, > TwitterObjectFactory expects json from Twitter (I haven't had any problems > using TwitterObjectFactory instead of the Deprecated DataObjectFactory). > You could "fix" it by matching the keys & values you have with the correct, > twitter API json - it should look like the example here: > https://dev.twitter.com/rest/reference/get/statuses/show/%3Aid > But it might be easier to download the tweets again, but this time use > TwitterObjectFactory.getRawJSON(status) to get the Original Json
[jira] [Commented] (SPARK-13009) spark-streaming-twitter_2.10 does not make it possible to access the raw twitter json
[ https://issues.apache.org/jira/browse/SPARK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15127718#comment-15127718 ] Mario Briggs commented on SPARK-13009: -- IMHO even if the StatusJSONImpl was not final, still should be done in Twitter4J > spark-streaming-twitter_2.10 does not make it possible to access the raw > twitter json > - > > Key: SPARK-13009 > URL: https://issues.apache.org/jira/browse/SPARK-13009 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Andrew Davidson >Priority: Blocker > Labels: twitter > > The Streaming-twitter package makes it easy for Java programmers to work with > twitter. The implementation returns the raw twitter data in JSON formate as a > twitter4J StatusJSONImpl object > JavaDStream tweets = TwitterUtils.createStream(ssc, twitterAuth); > The status class is different then the raw JSON. I.E. serializing the status > object will be the same as the original json. I have down stream systems that > can only process raw tweets not twitter4J Status objects. > Here is my bug/RFE request made to Twitter4J. > They asked I create a spark tracking issue. > On Thursday, January 21, 2016 at 6:27:25 PM UTC, Andy Davidson wrote: > Hi All > Quick problem summary: > My system uses the Status objects to do some analysis how ever I need to > store the raw JSON. There are other systems that process that data that are > not written in Java. > Currently we are serializing the Status Object. The JSON is going to break > down stream systems. > I am using the Apache Spark Streaming spark-streaming-twitter_2.10 > http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources > Request For Enhancement: > I imagine easy access to the raw JSON is a common requirement. Would it be > possible to add a member function to StatusJSONImpl getRawJson(). By default > the returned value would be null unless jsonStoreEnabled=True is set in the > config. > Alternative implementations: > > It should be possible to modify the spark-streaming-twitter_2.10 to provide > this support. The solutions is not very clean > It would required apache spark to define their own Status Pojo. The current > StatusJSONImpl class is marked final > The Wrapper is not going to work nicely with existing code. > spark-streaming-twitter_2.10 does not expose all of the twitter streaming > API so many developers are writing their implementations of > org.apache.park.streaming.twitter.TwitterInputDStream. This make maintenance > difficult. Its not easy to know when the spark implementation for twitter has > changed. > Code listing for > spark-1.6.0/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala > private[streaming] > class TwitterReceiver( > twitterAuth: Authorization, > filters: Seq[String], > storageLevel: StorageLevel > ) extends Receiver[Status](storageLevel) with Logging { > @volatile private var twitterStream: TwitterStream = _ > @volatile private var stopped = false > def onStart() { > try { > val newTwitterStream = new > TwitterStreamFactory().getInstance(twitterAuth) > newTwitterStream.addListener(new StatusListener { > def onStatus(status: Status): Unit = { > store(status) > } > Ref: > https://forum.processing.org/one/topic/saving-json-data-from-twitter4j.html > What do people think? > Kind regards > Andy > From: on behalf of Igor Brigadir > > Reply-To: > Date: Tuesday, January 19, 2016 at 5:55 AM > To: Twitter4J > Subject: Re: [Twitter4J] trouble writing unit test > Main issue is that the Json object is in the wrong json format. > eg: "createdAt": 1449775664000 should be "created_at": "Thu Dec 10 19:27:44 > + 2015", ... > It looks like the json you have was serialized from a java Status object, > which makes json objects different to what you get from the API, > TwitterObjectFactory expects json from Twitter (I haven't had any problems > using TwitterObjectFactory instead of the Deprecated DataObjectFactory). > You could "fix" it by matching the keys & values you have with the correct, > twitter API json - it should look like the example here: > https://dev.twitter.com/rest/reference/get/statuses/show/%3Aid > But it might be easier to download the tweets again, but this time use > TwitterObjectFactory.getRawJSON(status) to get the Original Json from the > Twitter API, and save that for later. (You must have jsonStoreEnabled=True in > your config, and call getRawJSON in the same thread as .showStatus() or >
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110994#comment-15110994 ] Mario Briggs commented on SPARK-12177: -- bq. If one uses the kafka v9 jar even when using the old consumer API, it can only work a Kafka v9 broker. I tried it on a single system setup (v0.9 client talking to v0.8 server-side) and the consumers had a problem (old or new). The producers though worked fine. So you are right. So then we will have kafka-assembly and kafka-assembly-v09/new and each including their version of kafka jars respectively right? ( I guess now, you were all along thinking 2 diff assemblies, and i guessed the other way round. Duh, IRC might have been faster) With the above confirmed, it automatically throws out 'The public API signatures (of KafkaUtils in v0.9 subproject) are different and do not clash (with KafkaUtils in original kafka subproject) and hence can be added to the existing (original kafka subproject) KafkaUtils class.’ So the only thing left it seems is to use 'new' or a better term instead of 'v09', since we both agree on that. Great and thanks Mark. How's the 'python/pyspark/streaming/kafka-v09(new).py' going > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109056#comment-15109056 ] Mario Briggs commented on SPARK-12177: -- bq. I totally understand what you mean. However, kafka has its own assembly in Spark and the way the code is structured right now, both the new API and old API would go in the same assembly so it's important to have a different package name. Also, I think for our end users transitioning from old to new API, I foresee them having 2 versions of their spark-kafka app. One that works with the old API and one with the new API. And, I think it would be an easier transition if they could include both the kafka API versions in the spark classpath and pick and choose which app to run without mucking with maven dependencies and re-compiling when they want to switch. Let me know if you disagree. Since this is WIP, i myself have atleast 1 more different option to what i suggested above... put just one to get the conversation rolling, so thanks for chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar does include all dependencies too, so we would include kafka's v0.9’s jars i guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side involves upgrading the brokers… i think that is not required when client uses a v0.9 jar though consuming only the older high level/low level API and talking to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my suggestion above itself is broken since we would have issues of same package & class names. 1 thought around not introducing the version in the package name or class name (I see that Flink does it in the class name) was to avoid forcing us to create v0.10/v0.11 packages (and customers to change code and recompile), even if those releases of kafka don’t have client-api’ or otherwise such changes that warrant us to make a new version (Also spark got away without putting a version# till now, which means less work in Spark, so not sure we want to start forcing this work going forward). Once we introduce the version #, we need to ensure it is in sync with kafka. That’s why 1 earlier idea i mentioned in this JIRA was 'The public API signatures (of KafkaUtils in v0.9 subproject) are different and do not clash (with KafkaUtils in original kafka subproject) and hence can be added to the existing (original kafka subproject) KafkaUtils class.’ Cody mentioned that we need to get others on the same page for this idea, so i guess we really need the committers to chime in here. Of course i forgot to answer’s Nikita’s followup question - 'do you mean that we would change the original KafkaUtils by adding new functions for new DirectIS/KafkaRDD but using them from separate module with kafka09 classes’ ? To be clear, these new public methods added to original kafka subproject’s ‘KafkaUtils' ,will make use of DirectKafkaInputDStream,KafkaRDD,KafkaRDDPartition,OffsetRange classes that are in a new v09 package (internal of course). In short we don’t have a new subproject. (I skipped class KafkaCluster class from the list, becuase i am thinking it makes more sense to call this class something like 'KafkaClient' instead going forward) > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109056#comment-15109056 ] Mario Briggs edited comment on SPARK-12177 at 1/20/16 6:16 PM: --- bq. I totally understand what you mean. However, kafka has its own assembly in Spark and the way the code is structured right now, both the new API and old API would go in the same assembly so it's important to have a different package name. Also, I think for our end users transitioning from old to new API, I foresee them having 2 versions of their spark-kafka app. One that works with the old API and one with the new API. And, I think it would be an easier transition if they could include both the kafka API versions in the spark classpath and pick and choose which app to run without mucking with maven dependencies and re-compiling when they want to switch. Let me know if you disagree. Hey Mark, Since this is WIP, i myself have atleast 1 more different option to what i suggested above... put just one to get the conversation rolling, so thanks for chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar does include all dependencies too, so we would include kafka's v0.9’s jars i guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side involves upgrading the brokers… i think that is not required when client uses a v0.9 jar though consuming only the older high level/low level API and talking to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my suggestion above itself is broken since we would have issues of same package & class names. 1 thought around not introducing the version in the package name or class name (I see that Flink does it in the class name) was to avoid forcing us to create v0.10/v0.11 packages (and customers to change code and recompile), even if those releases of kafka don’t have client-api’ or otherwise such changes that warrant us to make a new version (Also spark got away without putting a version# till now, which means less work in Spark, so not sure we want to start forcing this work going forward). Once we introduce the version #, we need to ensure it is in sync with kafka. That’s why 1 earlier idea i mentioned in this JIRA was 'The public API signatures (of KafkaUtils in v0.9 subproject) are different and do not clash (with KafkaUtils in original kafka subproject) and hence can be added to the existing (original kafka subproject) KafkaUtils class.’ This also addresses the issues u mention above. Cody mentioned that we need to get others on the same page for this idea, so i guess we really need the committers to chime in here. Of course i forgot to answer’s Nikita’s followup question - 'do you mean that we would change the original KafkaUtils by adding new functions for new DirectIS/KafkaRDD but using them from separate module with kafka09 classes’ ? To be clear, these new public methods added to original kafka subproject’s ‘KafkaUtils' ,will make use of DirectKafkaInputDStream,KafkaRDD,KafkaRDDPartition,OffsetRange classes that are in a new v09 package (internal of course). In short we don’t have a new subproject. (I skipped class KafkaCluster class from the list, becuase i am thinking it makes more sense to call this class something like 'KafkaClient' instead going forward) was (Author: mariobriggs): bq. I totally understand what you mean. However, kafka has its own assembly in Spark and the way the code is structured right now, both the new API and old API would go in the same assembly so it's important to have a different package name. Also, I think for our end users transitioning from old to new API, I foresee them having 2 versions of their spark-kafka app. One that works with the old API and one with the new API. And, I think it would be an easier transition if they could include both the kafka API versions in the spark classpath and pick and choose which app to run without mucking with maven dependencies and re-compiling when they want to switch. Let me know if you disagree. Since this is WIP, i myself have atleast 1 more different option to what i suggested above... put just one to get the conversation rolling, so thanks for chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar does include all dependencies too, so we would include kafka's v0.9’s jars i guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side involves upgrading the brokers… i think that is not required when client uses a v0.9 jar though consuming only the older high level/low level API and talking to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my suggestion above itself is broken since we would have issues of same package & class names. 1 thought around not introducing the version in the package name or class name (I see that Flink does
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105560#comment-15105560 ] Mario Briggs commented on SPARK-12177: -- Hi Nikita, great. 1 - We should also have a python/pyspark/streaming/kafka-v09.py as well that matches to our external/kafka-v09 2 - Why do you have the Broker.scala class? Unless i am missing something, it should be knocked off 3 - I think the package should be 'org.apache.spark.streaming.kafka' only in external/kafka-v09 and not 'org.apache.spark.streaming.kafka.v09'. This is because we produce a jar with a diff name (user picks which one and even if he/she mismatches, it errors correctly since the KafkaUtils method signatures are different) > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15082674#comment-15082674 ] Mario Briggs commented on SPARK-12177: -- implemented here - https://github.com/mariobriggs/spark/commit/2fcbb721b99b48e336ba7ef7c317c279c9483840 > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15081565#comment-15081565 ] Mario Briggs commented on SPARK-12177: -- Confirmed that the frequent KafkaConsumer object creation was reason for the consumer.position() method hang. Cleaned up frequent KafkaConsumer object creation https://github.com/mariobriggs/spark/commit/e40df7ee70fd72418969e8f9c81a1fee304b8b1c and it resolved issue > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075230#comment-15075230 ] Mario Briggs commented on SPARK-12177: -- you could also get just a few of the records you want i.e. not all in 1 shot override def getNext(): R = { if (iter == null || !iter.hasNext) { iter = consumer.poll(pollTime).iterator() } if (!iter.hasNext) { if ( requestOffset < part.untilOffset ) { // need to make another poll() and recheck above. So make a recursive call i.e. 'return getnext()' here ? } finished = true null.asInstanceOf[R] } else { ... > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075172#comment-15075172 ] Mario Briggs commented on SPARK-12177: -- Nikita, thank you. A-C : Looks good to me. (BTW i didn't review changes related to receiver based approach, even in earlier round) D - I think it is OK for KafkaTestUtils to have dependency on core, since that is more of our internal test approach (however i havent spent time to think if even that can be bettered). To the higher issue, i think Kafka *will* provide TopicPartition as serializable, which will make this moot, but good that we have tracked it here > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075205#comment-15075205 ] Mario Briggs commented on SPARK-12177: -- Very good point about creation of KafkaConsumer frequently. In fact, Praveen is investigating if that is reason the 'position()' method hangs when we have batch intervals at 200ms and below. So one way to try to optimize it is this way : since the 'compute' method in DirectKafkaInputDStream runs in the driver, why not store the 'KafkaConsumer' rather than the KafkaCluster as a member variable in this class. Of course we will need to mark it transient, so that its not attempted to be serialized and that means always check if null and re-initialize if required, before use. The only use of the Consumer here is to find the new latest offsets, so we will have to massage that method for use with an existing consumer object . Or another option is to let KafkaCluster have a KafkaConsumer instance as a member variable with same noted aspects about being transient. This also means, move the part about fetching the leader ipAddress for getPreferredLocations() away from KafkaRDD.getPartitions() to DirectKafkaInputDStream.compute() and have 'leaders' as constructor param to KafkaRDD ( i now realize that KafkaRDD is private so we are not having that on a public API as i thought earlier) > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075230#comment-15075230 ] Mario Briggs edited comment on SPARK-12177 at 12/30/15 5:24 PM: you could also get just a few of the records you want i.e. not all in 1 shot. So a gist below override def getNext(): R = { if (iter == null || !iter.hasNext) { iter = consumer.poll(pollTime).iterator() } if (!iter.hasNext) { if ( requestOffset < part.untilOffset ) { // need to make another poll() and recheck above. So make a recursive call i.e. 'return getnext()' here ? } finished = true null.asInstanceOf[R] } else { ... was (Author: mariobriggs): you could also get just a few of the records you want i.e. not all in 1 shot override def getNext(): R = { if (iter == null || !iter.hasNext) { iter = consumer.poll(pollTime).iterator() } if (!iter.hasNext) { if ( requestOffset < part.untilOffset ) { // need to make another poll() and recheck above. So make a recursive call i.e. 'return getnext()' here ? } finished = true null.asInstanceOf[R] } else { ... > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15074131#comment-15074131 ] Mario Briggs commented on SPARK-12177: -- with regards to review comment 'C' (2 comments above), the kafka folks have clarified that the timeout parameter on the poll() method is the time to spend waiting till the client side gets the data (from the server) and not time to spend waiting for data to become available on the server. This means that one might have to call poll() more than once, till we get to the consumerRecord of 'untilOffset' > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073035#comment-15073035 ] Mario Briggs commented on SPARK-12177: -- On the issue of lots of duplicate code from the original, 1 thought was whether we need to upgrade the older Receiver based approach to the new Consumer API? The Direct approach has so many benefits over the older Receiver based approach and i can't think of a drawback, that one might make the argument that we dont upgrade the latter at all, it remains on the older kafka consumer API and get deprecated over a long period time. Thoughts ? If we do go the above way, then there is very trivial overlap of code between original and this new consumer implementation. The public API signatures are different and do not clash and hence can be added to the existing KafkaUtils class. > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073020#comment-15073020 ] Mario Briggs edited comment on SPARK-12177 at 12/28/15 7:02 PM: Hi Nikita, thanks. Here are my review comments. I couldnt find a way to add them on the PR review in github, so added them here. My comments are a little detailed, since i too developed an implementation and tested it along with my colleague Praveen :-) after initial discussion on dev list A - KafkaCluster class getPartitions() seek() callers of the above methods all other methods that use withConsumer() These should not return a ‘Either’, but rather just the expected object ( the ‘Right’). The reason for the ‘Either’ object in the earlier code was due to the fact the earlier kafka client had to deal with trying the operation on all the ‘seedBrokers’ and handle the case if some of them were down. Similarly when dealing with ‘leaders’, client had to try the operation on all leaders for a TP (TopicAndPartition). When we use the new kaka-clients API, we don’t have to deal with trying against all the seedBrokers, leaders etc, since the new KafkaConsumer object internally handles all those details. Notice that in the earlier code, withBrokers() tries to connect() and invoke the passed in function multiple times with the brokers.forEach() and hence the need to accumulate errors. The earlier code also did a ‘return’ immediately when successful with one of the brokers. This does not apply with the new KafkaConsumer object. getPartitions() - https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62 consumer.partitionsFor() java API will returns a null if the topic doesn’t exist. If you don’t handle that, you run into a NPE when the user specifies a topic that doesn’t exist or makes a typo in the topic name (also not returning an exception saying the partition doesn’t exist is not right) our implementation is at - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala. If it is easier for you that we issue a PR to your repo, let us know B - KafkaRDD class getPreferredLocations() this method is missing in your code. The earlier implementation from Cody had the optimisation that if Kafka and the spark code (KafkaRDD) was running on the same cluster, then the RDD partition for a particular TopicPartition, would be local to that TopicPartition leader. Could you please add code to bring back this functionality. Our implementation, pulled this info inside the getPartitions- https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52. Probably more efficient to do it inside compute() of the DStream, but that meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E C - KafkaRDDIterator class getNext() As mentioned in issue #1 noted here - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api ,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small value. We are following up this issue with kafka - https://issues.apache.org/jira/browse/KAFKA-3044 . I see that you have made this a configurable value in your implementation which is good, but either ways till this behaviour is clarified or even otherwise, we need this assert -https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171 or else we will be silently skipping data without the user knowing it (either default value or user specifying a smaller value) D- Non use of TopicPartition class of new Consumer You already have figured out that this class is not Serializable and hence in the public interface you have used the older TopicAndPartition class. We have raised this issue https://issues.apache.org/jira/browse/KAFKA-3029 with Kafka and maybe be provided with one (yet to see). However using the older TopicAndPartition class in our public API, which introduces a dependency on the older kafka core rather than just kaka-clients jar, i would think is not the preferred approach. If we are not provided with a serializable TopicPartition, then we should rather use our own serializable object (or just a tuple of string, int, Long) inside of DirectKafkaInputDStream to ensure it is Serializable. was (Author: mariobriggs): Hi Nikita, thanks. Here are my review
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073020#comment-15073020 ] Mario Briggs commented on SPARK-12177: -- Hi Nikita, thanks. Here are my review comments. I couldnt find a way to add them on the PR review in github, so added them here. My comments are a little detailed, since i too developed an implementation and tested it along with my colleague Praveen :-) after initial discussion on dev list A - KafkaCluster class getPartitions() seek() callers of the above methods all other methods that use withConsumer() These should not return a ‘Either’, but rather just the expected object ( the ‘Right’). The reason for the ‘Either’ object in the earlier code was due to the fact the earlier kafka client had to deal with trying the operation on all the ‘seedBrokers’ and handle the case if some of them were down. Similarly when dealing with ‘leaders’, client had to try the operation on all leaders for a TP (TopicAndPartition). When we use the new kaka-clients API, we don’t have to deal with trying against all the seedBrokers, leaders etc, since the new KafkaConsumer object internally handles all those details. Notice that in the earlier code, withBrokers() tries to connect() and invoke the passed in function multiple times with the brokers.forEach() and hence the need to accumulate errors. The earlier code also did a ‘return’ immediately when successful with one of the brokers. This does not apply with the new KafkaConsumer object. getPartitions() - https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62 consumer.partitionsFor() java API will returns a null if the topic doesn’t exist. If you don’t handle that, you run into a NPE when the user specifies a topic that doesn’t exist or makes a typo in the topic name (also not returning an exception saying the partition doesn’t exist is not right) our implementation is at - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala. If it is easier for you that we issue a PR to your repo, let us know B - KafkaRDD class getPreferredLocations() this method is missing in your code. The earlier implementation from Cody had the optimisation that if Kafka and the spark code (KafkaRDD) was running on the same cluster, then the RDD partition for a particular TopicPartition, would be local to that TopicPartition leader. Could you please add code to bring back this functionality. Our implementation, pulled this info inside the getPartitions- https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52. Probably more efficient to do it inside compute() of the DStream, but that meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E C - KafkaRDDIterator class getNext() As mentioned in issue #1 noted here - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api ,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small value. We are following up with [this issue with kafka](https://issues.apache.org/jira/browse/KAFKA-3044.I see that you have made this a configurable value in your implementation which is good, but either ways till this behaviour is clarified or even otherwise, we need [this assert](https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171) or else we will be silently skipping data without the user knowing it (either default value or user specifying a smaller value) D- Non use of TopicPartition class of new Consumer You already have figured out that this class is not Serializable and hence in the public interface you have used the older TopicAndPartition class. We have raised [this issue](https://issues.apache.org/jira/browse/KAFKA-3029) with Kafka and maybe be provided with one (yet to see). However using the older TopicAndPartition class in our public API, which introduces a dependency on the older kafka core rather than just kaka-clients jar, i would think is not the preferred approach. If we are not provided with a serializable TopicPartition, then we should rather use our own serializable object (or just a tuple of string, int, Long) inside of DirectKafkaInputDStream to ensure it is Serializable. > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > >