[jira] [Commented] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval

2016-11-07 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15646665#comment-15646665
 ] 

Mario Briggs commented on SPARK-16545:
--

[~lwlin] I agree with the PR discussion. I am not terribly sure what the value 
of the 'Resolution'  state should be when closing... 'Later' for e.g. to 
indicate this is being fixed elsehere etc 

> Structured Streaming : foreachSink creates the Physical Plan multiple times 
> per TriggerInterval 
> 
>
> Key: SPARK-16545
> URL: https://issues.apache.org/jira/browse/SPARK-16545
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0
>Reporter: Mario Briggs
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17630) jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available for akka

2016-10-18 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587664#comment-15587664
 ] 

Mario Briggs commented on SPARK-17630:
--

[~zsxwing] thanks much. any pointers on how/where to add code or something 
existing in the code base to look. I can then try a PR

> jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available 
> for akka
> 
>
> Key: SPARK-17630
> URL: https://issues.apache.org/jira/browse/SPARK-17630
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Mario Briggs
> Attachments: SecondCodePath.txt, firstCodepath.txt
>
>
> Hi,
> I have 2 code-paths from my app that result in a jvm OOM. 
> In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts 
> down the JVM, so that the caller (py4J) get notified with proper stack trace. 
> Attached stack-trace file (firstCodepath.txt)
> In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
> JVM, so the caller does not get notified. 
> Attached stack-trace file (SecondCodepath.txt)
> Is it possible to have an jvm exit handle for the rpc. netty path?
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17917) Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event

2016-10-18 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574639#comment-15574639
 ] 

Mario Briggs edited comment on SPARK-17917 at 10/18/16 6:49 PM:


>>
I don't have a strong feeling on this partly because I'm not sure what the 
action then is – kill the job?
<<
Here is an example - Lets say i am using a notebook and kicked off some spark 
actions that dont' get executors because user/org/group quota's of executors 
have been exhausted. These events can be used by the notebook implementor to 
then surface the issue to the user via a UI update on that cell itself, maybe 
even additionally query the user/org/group quota's, show which apps are using 
up the quota's etc and allow the user to take what action required (kill the 
other jobs, just wait on this job etc). Therefore not looking to define in 
anyway on the event, what the set of actions can be, since that would be very 
implementation specific.

>>
Maybe, I suppose it will be a little tricky to define what the event is here
<<
Where you referring to the actual arguments of the event method. I can give a 
shot at defining and then look for feedback



was (Author: mariobriggs):
>>
I don't have a strong feeling on this partly because I'm not sure what the 
action then is – kill the job?
<<
Here is an example - Lets say i am using a notebook and kicked off some spark 
actions that dont' get executors because user/org/group quota's of executors 
have been exhausted. These events can be used by the notebook implementor to 
then surface the issue to the user via a UI update on that cell itself, maybe 
even additionally query the user/org/group quota's show which apps are using up 
the quota's etc and allow the user to take what action required (kill the other 
jobs, just wait on this job etc). Therefore not looking to define in anyway on 
the event, what the set of actions can be, since that would be very 
implementation specific.

>>
Maybe, I suppose it will be a little tricky to define what the event is here
<<
Where you referring to the actual arguments of the event method. I can give a 
shot at defining and then look for feedback


> Convert 'Initial job has not accepted any resources..' logWarning to a 
> SparkListener event
> --
>
> Key: SPARK-17917
> URL: https://issues.apache.org/jira/browse/SPARK-17917
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Mario Briggs
>Priority: Minor
>
> When supporting Spark on a multi-tenant shared large cluster with quotas per 
> tenant, often a submitted taskSet might not get executors because quotas have 
> been exhausted (or) resources unavailable. In these situations, firing a 
> SparkListener event instead of just logging the issue (as done currently at 
> https://github.com/apache/spark/blob/9216901d52c9c763bfb908013587dcf5e781f15b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L192),
>  would give applications/listeners an opportunity to handle this more 
> appropriately as needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17917) Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event

2016-10-14 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574639#comment-15574639
 ] 

Mario Briggs commented on SPARK-17917:
--

>>
I don't have a strong feeling on this partly because I'm not sure what the 
action then is – kill the job?
<<
Here is an example - Lets say i am using a notebook and kicked off some spark 
actions that dont' get executors because user/org/group quota's of executors 
have been exhausted. These events can be used by the notebook implementor to 
then surface the issue to the user via a UI update on that cell itself, maybe 
even additionally query the user/org/group quota's show which apps are using up 
the quota's etc and allow the user to take what action required (kill the other 
jobs, just wait on this job etc). Therefore not looking to define in anyway on 
the event, what the set of actions can be, since that would be very 
implementation specific.

>>
Maybe, I suppose it will be a little tricky to define what the event is here
<<
Where you referring to the actual arguments of the event method. I can give a 
shot at defining and then look for feedback


> Convert 'Initial job has not accepted any resources..' logWarning to a 
> SparkListener event
> --
>
> Key: SPARK-17917
> URL: https://issues.apache.org/jira/browse/SPARK-17917
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Mario Briggs
>Priority: Minor
>
> When supporting Spark on a multi-tenant shared large cluster with quotas per 
> tenant, often a submitted taskSet might not get executors because quotas have 
> been exhausted (or) resources unavailable. In these situations, firing a 
> SparkListener event instead of just logging the issue (as done currently at 
> https://github.com/apache/spark/blob/9216901d52c9c763bfb908013587dcf5e781f15b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L192),
>  would give applications/listeners an opportunity to handle this more 
> appropriately as needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17917) Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event

2016-10-13 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572795#comment-15572795
 ] 

Mario Briggs commented on SPARK-17917:
--

would appreciate if the spark devs comment in whether they see this as a bad 
idea for some reason. 

I basically see add 2 events to SparkListener like
  onTaskStarved() and OnTaskUnStarved() - the latter fires only if 
onTaskStarved() fired in the first place for a taskSet

> Convert 'Initial job has not accepted any resources..' logWarning to a 
> SparkListener event
> --
>
> Key: SPARK-17917
> URL: https://issues.apache.org/jira/browse/SPARK-17917
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Mario Briggs
>
> When supporting Spark on a multi-tenant shared large cluster with quotas per 
> tenant, often a submitted taskSet might not get executors because quotas have 
> been exhausted (or) resources unavailable. In these situations, firing a 
> SparkListener event instead of just logging the issue (as done currently at 
> https://github.com/apache/spark/blob/9216901d52c9c763bfb908013587dcf5e781f15b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L192),
>  would give applications/listeners an opportunity to handle this more 
> appropriately as needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17917) Convert 'Initial job has not accepted any resources..' logWarning to a SparkListener event

2016-10-13 Thread Mario Briggs (JIRA)
Mario Briggs created SPARK-17917:


 Summary: Convert 'Initial job has not accepted any resources..' 
logWarning to a SparkListener event
 Key: SPARK-17917
 URL: https://issues.apache.org/jira/browse/SPARK-17917
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Mario Briggs


When supporting Spark on a multi-tenant shared large cluster with quotas per 
tenant, often a submitted taskSet might not get executors because quotas have 
been exhausted (or) resources unavailable. In these situations, firing a 
SparkListener event instead of just logging the issue (as done currently at 
https://github.com/apache/spark/blob/9216901d52c9c763bfb908013587dcf5e781f15b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L192),
 would give applications/listeners an opportunity to handle this more 
appropriately as needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available for akka

2016-09-22 Thread Mario Briggs (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Briggs updated SPARK-17630:
-
Summary: jvm-exit-on-fatal-error handler for spark.rpc.netty like there is 
available for akka  (was: jvm-exit-on-fatal-error for spark.rpc.netty like 
there is available for akka)

> jvm-exit-on-fatal-error handler for spark.rpc.netty like there is available 
> for akka
> 
>
> Key: SPARK-17630
> URL: https://issues.apache.org/jira/browse/SPARK-17630
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Mario Briggs
> Attachments: SecondCodePath.txt, firstCodepath.txt
>
>
> Hi,
> I have 2 code-paths from my app that result in a jvm OOM. 
> In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts 
> down the JVM, so that the caller (py4J) get notified with proper stack trace. 
> Attached stack-trace file (firstCodepath.txt)
> In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
> JVM, so the caller does not get notified. 
> Attached stack-trace file (SecondCodepath.txt)
> Is it possible to have an jvm exit handle for the rpc. netty path?
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka

2016-09-21 Thread Mario Briggs (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Briggs updated SPARK-17630:
-
Description: 
Hi,

I have 2 code-paths from my app that result in a jvm OOM. 

In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down 
the JVM, so that the caller (py4J) get notified with proper stack trace. 
Attached stack-trace file (firstCodepath.txt)

In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
JVM, so the caller does not get notified. 
Attached stack-trace file (SecondCodepath.txt)

Is it possible to have an jvm exit handle for the rpc. netty path?


 

  was:
Hi,

I have 2 code-paths from my app that result in a jvm OOM. 

In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down 
the JVM, so that the caller (py4J) get notified with proper stack trace

In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
JVM, so the caller does not get notified.

Is it possible to have an jvm exit handle for the rpc. netty path?

First code path trace file - 
 


> jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
> 
>
> Key: SPARK-17630
> URL: https://issues.apache.org/jira/browse/SPARK-17630
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Mario Briggs
> Attachments: SecondCodePath.txt, firstCodepath.txt
>
>
> Hi,
> I have 2 code-paths from my app that result in a jvm OOM. 
> In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts 
> down the JVM, so that the caller (py4J) get notified with proper stack trace. 
> Attached stack-trace file (firstCodepath.txt)
> In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
> JVM, so the caller does not get notified. 
> Attached stack-trace file (SecondCodepath.txt)
> Is it possible to have an jvm exit handle for the rpc. netty path?
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka

2016-09-21 Thread Mario Briggs (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Briggs updated SPARK-17630:
-
Attachment: SecondCodePath.txt

> jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
> 
>
> Key: SPARK-17630
> URL: https://issues.apache.org/jira/browse/SPARK-17630
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Mario Briggs
> Attachments: SecondCodePath.txt, firstCodepath.txt
>
>
> Hi,
> I have 2 code-paths from my app that result in a jvm OOM. 
> In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts 
> down the JVM, so that the caller (py4J) get notified with proper stack trace
> In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
> JVM, so the caller does not get notified.
> Is it possible to have an jvm exit handle for the rpc. netty path?
> First code path trace file - 
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka

2016-09-21 Thread Mario Briggs (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Briggs updated SPARK-17630:
-
Attachment: firstCodepath.txt

> jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
> 
>
> Key: SPARK-17630
> URL: https://issues.apache.org/jira/browse/SPARK-17630
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Mario Briggs
> Attachments: firstCodepath.txt
>
>
> Hi,
> I have 2 code-paths from my app that result in a jvm OOM. 
> In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts 
> down the JVM, so that the caller (py4J) get notified with proper stack trace
> In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
> JVM, so the caller does not get notified.
> Is it possible to have an jvm exit handle for the rpc. netty path?
> First code path trace file - 
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka

2016-09-21 Thread Mario Briggs (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Briggs updated SPARK-17630:
-
Description: 
Hi,

I have 2 code-paths from my app that result in a jvm OOM. 

In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down 
the JVM, so that the caller (py4J) get notified with proper stack trace

In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
JVM, so the caller does not get notified.

Is it possible to have an jvm exit handle for the rpc. netty path?

First code path trace file - 
 

  was:
Hi,

I have 2 code-paths from my app that result in a jvm OOM. 

In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down 
the JVM, so that the caller (py4J) get notified with proper stack trace

In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
JVM, so the caller does not get notified.

Is it possible to have an jvm exit handle for the rpc. netty path?

First code path trace
 


> jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka
> 
>
> Key: SPARK-17630
> URL: https://issues.apache.org/jira/browse/SPARK-17630
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Mario Briggs
>
> Hi,
> I have 2 code-paths from my app that result in a jvm OOM. 
> In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts 
> down the JVM, so that the caller (py4J) get notified with proper stack trace
> In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
> JVM, so the caller does not get notified.
> Is it possible to have an jvm exit handle for the rpc. netty path?
> First code path trace file - 
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17630) jvm-exit-on-fatal-error for spark.rpc.netty like there is available for akka

2016-09-21 Thread Mario Briggs (JIRA)
Mario Briggs created SPARK-17630:


 Summary: jvm-exit-on-fatal-error for spark.rpc.netty like there is 
available for akka
 Key: SPARK-17630
 URL: https://issues.apache.org/jira/browse/SPARK-17630
 Project: Spark
  Issue Type: Question
  Components: Spark Core
Affects Versions: 1.6.0
Reporter: Mario Briggs


Hi,

I have 2 code-paths from my app that result in a jvm OOM. 

In the first code path, 'akka.jvm-exit-on-fatal-error' kicks in and shuts down 
the JVM, so that the caller (py4J) get notified with proper stack trace

In the 2nd code path (rpc.netty), no such handler kicks in and shutdown the 
JVM, so the caller does not get notified.

Is it possible to have an jvm exit handle for the rpc. netty path?

First code path trace
 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval

2016-07-14 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378811#comment-15378811
 ] 

Mario Briggs edited comment on SPARK-16545 at 7/15/16 3:57 AM:
---

thanks. I looked into it too and i was not getting a fix that was satisfying to 
myself. The problem seems to be that Dataset assumes it has a QueryExecution 
with the Physical Plan (which is true in the batch case), since most of the 
Listener/metrics gathering functions want to dump this info , whereas in 
streaming we want only the 'inner' IncrementalExecution to produce the 
PhysicalPlan. I will submit what i have tried to do as well to ease the 
discussion points.


was (Author: mariobriggs):
thanks. I looked into it too and i was not getting a fix that was satisfying to 
myself. The problem seems to be that Dataset assumes it has a QueryExecution 
with the Physical Plan (which is true in the batch case), since most of the 
Listener/metrics gathering functions want to dump this info , whereas in 
streaming we want only the 'inner' IncrementalExecution to produce the 
PhysicalPlan. I will submit have i tried to do as well to ease the discussion 
points.

> Structured Streaming : foreachSink creates the Physical Plan multiple times 
> per TriggerInterval 
> 
>
> Key: SPARK-16545
> URL: https://issues.apache.org/jira/browse/SPARK-16545
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: Mario Briggs
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval

2016-07-14 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378811#comment-15378811
 ] 

Mario Briggs commented on SPARK-16545:
--

thanks. I looked into it too and i was not getting a fix that was satisfying to 
myself. The problem seems to be that Dataset assumes it has a QueryExecution 
with the Physical Plan (which is true in the batch case), since most of the 
Listener/metrics gathering functions want to dump this info , whereas in 
streaming we want only the 'inner' IncrementalExecution to produce the 
PhysicalPlan. I will submit have i tried to do as well to ease the discussion 
points.

> Structured Streaming : foreachSink creates the Physical Plan multiple times 
> per TriggerInterval 
> 
>
> Key: SPARK-16545
> URL: https://issues.apache.org/jira/browse/SPARK-16545
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: Mario Briggs
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval

2016-07-14 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376758#comment-15376758
 ] 

Mario Briggs edited comment on SPARK-16545 at 7/14/16 11:01 AM:


While looking at the performance of Structured streaming, found some excessive 
time being spent in the driver. 

Further looking into this, found the time spent in multiple (3 to be exact) 
initialisations of 
[QueryExecution.executedPlan|https://github.com/mariobriggs/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L85]
 due to multiple instances of QueryExecution created in the 
forEachSink.addBatch. 

Creation of physical plan involves more time and hence shouldn't be done more 
than once per TriggerInterval


was (Author: mariobriggs):
While looking at the performance of Structured streaming, found some excessive 
time being spent in the driver. 

Further looking into this, found the time spent in multiple (3 to be exact) 
initialisations of 
[QueryExecution.executedPlan|https://github.com/mariobriggs/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L85]
 due to multiple instances of QueryExecution created in the 
forEachSink.addBatch. 

Creation of physical plan involves more time and hence shouldn't be done more 
than once

> Structured Streaming : foreachSink creates the Physical Plan multiple times 
> per TriggerInterval 
> 
>
> Key: SPARK-16545
> URL: https://issues.apache.org/jira/browse/SPARK-16545
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: Mario Briggs
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval

2016-07-14 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376758#comment-15376758
 ] 

Mario Briggs edited comment on SPARK-16545 at 7/14/16 10:53 AM:


While looking at the performance of Structured streaming, found some excessive 
time being spent in the driver. 

Further looking into this, found the time spent in multiple (3 to be exact) 
initialisations of 
[QueryExecution.executedPlan|https://github.com/mariobriggs/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L85]
 due to multiple instances of QueryExecution created in the 
forEachSink.addBatch. 

Creation of physical plan involves more time and hence shouldn't be done more 
than once


was (Author: mariobriggs):
While looking at the performance of Structured streaming, found some excessive 
time being spent in the driver. 

Further looking into this, found the time spent in multiple (3 to be exact) 
initialisations of QueryExecution.executedPlan due to multiple instances of 
QueryExecution created in the forEachSink.addBatch. 

Creation of physical plan involves more time and hence shouldn't be done more 
than once

> Structured Streaming : foreachSink creates the Physical Plan multiple times 
> per TriggerInterval 
> 
>
> Key: SPARK-16545
> URL: https://issues.apache.org/jira/browse/SPARK-16545
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: Mario Briggs
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval

2016-07-14 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376758#comment-15376758
 ] 

Mario Briggs commented on SPARK-16545:
--

While looking at the performance of Structured streaming, found some excessive 
time being spent in the driver. 

Further looking into this, found the time spent in multiple (3 to be exact) 
initialisations of QueryExecution.executedPlan due to multiple instances of 
QueryExecution created in the forEachSink.addBatch. 

Creation of physical plan involves more time and hence shouldn't be done more 
than once

> Structured Streaming : foreachSink creates the Physical Plan multiple times 
> per TriggerInterval 
> 
>
> Key: SPARK-16545
> URL: https://issues.apache.org/jira/browse/SPARK-16545
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: Mario Briggs
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16545) Structured Streaming : foreachSink creates the Physical Plan multiple times per TriggerInterval

2016-07-14 Thread Mario Briggs (JIRA)
Mario Briggs created SPARK-16545:


 Summary: Structured Streaming : foreachSink creates the Physical 
Plan multiple times per TriggerInterval 
 Key: SPARK-16545
 URL: https://issues.apache.org/jira/browse/SPARK-16545
 Project: Spark
  Issue Type: Bug
  Components: SQL, Streaming
Affects Versions: 2.0.0
Reporter: Mario Briggs
 Fix For: 2.0.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15089) kafka-spark consumer with SSL problem

2016-05-04 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15270203#comment-15270203
 ] 

Mario Briggs commented on SPARK-15089:
--

Kafka supports SSL only with the 0.9x Kakfa client API's. The Spark-Kafka 
connector you are exercising (KafkaUtils.createDirectStream) uses the 0.8 Kafka 
API's that does not support SSL. Your standalone kafka consumer program is 
using the 0.9 kafka client.

If you are willing to play with the edge, then this PR has the code that uses 
the kafka 0.9 client API - https://github.com/apache/spark/pull/11863 and the 
JIRA is ttps://issues.apache.org/jira/browse/SPARK-12177

> kafka-spark consumer with SSL problem
> -
>
> Key: SPARK-15089
> URL: https://issues.apache.org/jira/browse/SPARK-15089
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: JasonChang
>
> I am not sure spark streaming support SSL
> I tried to add params to kafkaParams, but it not work
> {code}
> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new 
> Duration(1));
> Set topicmap = new HashSet();
> topicmap.add(kafkaTopic);
> Map kafkaParams = new HashMap();
> kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server_url);
> kafkaParams.put("security.protocol", "SSL");
> kafkaParams.put("ssl.keystore.type", "JKS");
> kafkaParams.put("ssl.keystore.location", "/opt/cert/keystore.jks");
> kafkaParams.put("ssl.keystore.password ", "password");
> kafkaParams.put("ssl.key.password", "password");
> kafkaParams.put("ssl.truststore.type", "JKS");
> kafkaParams.put("ssl.truststore.location", "/opt/cert/client.truststore.jks");
> kafkaParams.put("ssl.truststore.password", "password");
> kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopic);
> JavaPairInputDStream stream = 
> KafkaUtils.createDirectStream(jsc,
>   String.class,
>   String.class,
>   StringDecoder.class,
>   StringDecoder.class,
>   kafkaParams,
>   topicmap
> );
> JavaDStream lines = stream.map(new Function, 
> String>() {
>   public String call(Tuple2 tuple2) {
>   return tuple2._2();
>   }
> });
> {code}
> {code}
> Exception in thread "main" org.apache.spark.SparkException: 
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>   at scala.util.Either.fold(Either.scala:97)
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-04-26 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15259530#comment-15259530
 ] 

Mario Briggs commented on SPARK-12177:
--

what's the thinking on this one with respect to Structured Streaming? Is the 
thinking that Kafka0.9 be supported with the older Dstream API 
(KafkaUtils.createDirectStream) AND the newer structured streaming way of doing 
things ? or kafka 0.9 will only be supported only with the new structured 
streaming way? I am going to assume only [~tdas] or [~rxin] have an good idea 
on Structured streaming (sorry [~mgrover] , [~c...@koeninger.org] if i have 
insulted you :-) ), so appreciate if they can chime in. 

For my side i am assuming that 0.9 will be supported older DStream API as well. 
[~mgrover] howz it going merging cody's changes and making a new subproject.

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14745) CEP support in Spark Streaming

2016-04-21 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15251353#comment-15251353
 ] 

Mario Briggs commented on SPARK-14745:
--

Hi Sean,
thanks for 'fixing' the target.

To answer you question, right now the code contains a first cut minimal 
implementation that i am hoping others can comment on if there are more 
efficient approaches to implement (a). Then there is the task of seeing how to 
fit in with structured streaming, now that quite a bit of info is available on 
this topic (b). Both b and a would led us to be in a better position to answer 
your question. For example with b, would that be a UDF or not and if so what 
that means for the implementation and users w.r.t to perf and/or ease of use. 
Also in the initial structured streaming proposal 
(https://issues.apache.org/jira/secure/attachment/12793419/StreamingDataFrameProposal.pdf)
 the 'Event- trigger' section kind of left the impression to me that it was 
aiming at pattern matching ( I am not yet up-todate on the new one), so curious 
on that. Finally if 'a' leads to some more options, those might lead to looking 
at some changes that make sense. Sorry for the slightly long-winded answer, but 
i hope it gives an idea of current options.

> CEP support in Spark Streaming
> --
>
> Key: SPARK-14745
> URL: https://issues.apache.org/jira/browse/SPARK-14745
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Mario Briggs
> Attachments: SparkStreamingCEP.pdf
>
>
> Complex Event Processing is a often used feature in Streaming applications. 
> Spark Streaming current does not have a DSL/API for it. This JIRA is about 
> how/what can we add in Spark Streaming to support CEP out of the box



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14597) Streaming Listener timing metrics should include time spent in JobGenerator's graph.generateJobs

2016-04-20 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15251285#comment-15251285
 ] 

Mario Briggs commented on SPARK-14597:
--

Yeah i think it is neat. Good to me. Thanks

> Streaming Listener timing metrics should include time spent in JobGenerator's 
> graph.generateJobs
> 
>
> Key: SPARK-14597
> URL: https://issues.apache.org/jira/browse/SPARK-14597
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, Streaming
>Affects Versions: 1.6.1, 2.0.0
>Reporter: Sachin Aggarwal
>Priority: Minor
>
> While looking to tune our streaming application, the piece of info we were 
> looking for was actual processing time per batch. The 
> StreamingListener.onBatchCompleted event provides a BatchInfo object that 
> provided this information. It provides the following data
>  - processingDelay
>  - schedulingDelay
>  - totalDelay
>  - Submission Time
>  The above are essentially calculated from the streaming JobScheduler 
> clocking the processingStartTime and processingEndTime for each JobSet. 
> Another metric available is submissionTime which is when a Jobset was put on 
> the Streaming Scheduler's Queue. 
>  
> So we took processing delay as our actual processing time per batch. However 
> to maintain a stable streaming application, we found that the our batch 
> interval had to be a little less than DOUBLE of the processingDelay metric 
> reported. (We are using a DirectKafkaInputStream). On digging further, we 
> found that processingDelay is only clocking time spent in the ForEachRDD 
> closure of the Streaming application and that JobGenerator's 
> graph.generateJobs 
> (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248)
>  method takes a significant more amount of time.
>  Thus a true reflection of processing time is
>  a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay)
>  b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay)
>  c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay 
> metric)
>  d - Time spent in Jobset's job run (existing processingDelay metric)
>  
>  Additionally a JobGeneratorQueue delay (#a) could be due to either 
> graph.generateJobs taking longer than batchInterval or other JobGenerator 
> events like checkpointing adding up time. Thus it would be beneficial to 
> report time taken by the checkpointing Job as well



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14745) CEP support in Spark Streaming

2016-04-19 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249261#comment-15249261
 ] 

Mario Briggs edited comment on SPARK-14745 at 4/20/16 5:18 AM:
---

Document with what is CEP, Examples, Features and possible API 


was (Author: mariobriggs):
Examples, Features and possible API

> CEP support in Spark Streaming
> --
>
> Key: SPARK-14745
> URL: https://issues.apache.org/jira/browse/SPARK-14745
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Mario Briggs
> Attachments: SparkStreamingCEP.pdf
>
>
> Complex Event Processing is a often used feature in Streaming applications. 
> Spark Streaming current does not have a DSL/API for it. This JIRA is about 
> how/what can we add in Spark Streaming to support CEP out of the box



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-14745) CEP support in Spark Streaming

2016-04-19 Thread Mario Briggs (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Briggs updated SPARK-14745:
-
Attachment: SparkStreamingCEP.pdf

Examples, Features and possible API

> CEP support in Spark Streaming
> --
>
> Key: SPARK-14745
> URL: https://issues.apache.org/jira/browse/SPARK-14745
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Mario Briggs
> Attachments: SparkStreamingCEP.pdf
>
>
> Complex Event Processing is a often used feature in Streaming applications. 
> Spark Streaming current does not have a DSL/API for it. This JIRA is about 
> how/what can we add in Spark Streaming to support CEP out of the box



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14745) CEP support in Spark Streaming

2016-04-19 Thread Mario Briggs (JIRA)
Mario Briggs created SPARK-14745:


 Summary: CEP support in Spark Streaming
 Key: SPARK-14745
 URL: https://issues.apache.org/jira/browse/SPARK-14745
 Project: Spark
  Issue Type: New Feature
  Components: Streaming
Reporter: Mario Briggs


Complex Event Processing is a often used feature in Streaming applications. 
Spark Streaming current does not have a DSL/API for it. This JIRA is about 
how/what can we add in Spark Streaming to support CEP out of the box



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14597) Streaming Listener timing metrics should include time spent in JobGenerator's graph.generateJobs

2016-04-14 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15241358#comment-15241358
 ] 

Mario Briggs edited comment on SPARK-14597 at 4/14/16 3:32 PM:
---

I think there is an opportunity to merge both your approaches above. Let me 
explain, taking how the onOutputOperationStarted/onOutputOperationCompleted is 
already implemented. 

So rather than providing a single time metric and a single start/complete event 
that encompasses the generateJob for all OutputStreams, you could provide a 
start/complete event for each individual outputstream generateJob and 
onBatchComplete provide the metric for generateJob of all OutputStreams. This 
way a user can also figure out if a individual outputstream is the culprit. 

The above would require 2 additional things - pass an eventLoop to 
DStreamGraph.generateJobs() method. This eventLoop should not be the existing 
eventLoop instance in JobGenerator, but rather another new eventLoop instance 
(say genJobEventLoop) in JobGenerator. This is because the existing 
JobGenerator.eventLoop instance's thread is used to actually drive the Job 
Generation and making that thread do additional tasks will increase latency in 
Streaming. This new 'genJobEventLoop' will handle a GenJobStarted and 
GenJobCompleted event and use those events to fire corresponding events to the 
ListenerBus and gather the generateJob metric for all outputStreams to set it 
in the JobSet 


was (Author: mariobriggs):
I think there is an opportunity to merge both your approaches above. Let me 
explain, taking how the onOutputOperationStarted/onOutputOperationCompleted is 
already implemented. 

So rather than providing a single time metric and a single start/complete event 
that encompasses the generateJob for all OutputStreams, you could provide a 
start/complete event for each individual outputstream generateJob and 
onBatchComplete provide the metric for generateJob of all OutputStreams. This 
way a user can also figure out if a individual outputstream is the culprit. 

The above would require 2 additional things - pass an eventLoop to 
DStreamGraph.generateJobs() method. This eventLoop should not be the existing 
eventLoop instance in JobGenerator, but rather another new eventLoop instance 
(say genJobEventLoop) in JobGenerator. This is because the existing 
JobGenerator.eventLoop instance's thread is used to actually drive the Job 
Generation and making that thread do additional tasks will increase latency in 
Streaming. This new 'genJobEventLoop' will handle a GenJobStarted and 
GenJobCompleted event and use those events to fire corresponding events to the 
ListenerBus and gather the generateJob metric for all outputStreams and set it 
in the JobSet 

> Streaming Listener timing metrics should include time spent in JobGenerator's 
> graph.generateJobs
> 
>
> Key: SPARK-14597
> URL: https://issues.apache.org/jira/browse/SPARK-14597
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, Streaming
>Affects Versions: 1.6.1, 2.0.0
>Reporter: Sachin Aggarwal
>Priority: Minor
>
> While looking to tune our streaming application, the piece of info we were 
> looking for was actual processing time per batch. The 
> StreamingListener.onBatchCompleted event provides a BatchInfo object that 
> provided this information. It provides the following data
>  - processingDelay
>  - schedulingDelay
>  - totalDelay
>  - Submission Time
>  The above are essentially calculated from the streaming JobScheduler 
> clocking the processingStartTime and processingEndTime for each JobSet. 
> Another metric available is submissionTime which is when a Jobset was put on 
> the Streaming Scheduler's Queue. 
>  
> So we took processing delay as our actual processing time per batch. However 
> to maintain a stable streaming application, we found that the our batch 
> interval had to be a little less than DOUBLE of the processingDelay metric 
> reported. (We are using a DirectKafkaInputStream). On digging further, we 
> found that processingDelay is only clocking time spent in the ForEachRDD 
> closure of the Streaming application and that JobGenerator's 
> graph.generateJobs 
> (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248)
>  method takes a significant more amount of time.
>  Thus a true reflection of processing time is
>  a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay)
>  b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay)
>  c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay 
> metric)
>  d - Time spent in Jobset's job run (existing processingDelay 

[jira] [Commented] (SPARK-14597) Streaming Listener timing metrics should include time spent in JobGenerator's graph.generateJobs

2016-04-14 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15241358#comment-15241358
 ] 

Mario Briggs commented on SPARK-14597:
--

I think there is an opportunity to merge both your approaches above. Let me 
explain, taking how the onOutputOperationStarted/onOutputOperationCompleted is 
already implemented. 

So rather than providing a single time metric and a single start/complete event 
that encompasses the generateJob for all OutputStreams, you could provide a 
start/complete event for each individual outputstream generateJob and 
onBatchComplete provide the metric for generateJob of all OutputStreams. This 
way a user can also figure out if a individual outputstream is the culprit. 

The above would require 2 additional things - pass an eventLoop to 
DStreamGraph.generateJobs() method. This eventLoop should not be the existing 
eventLoop instance in JobGenerator, but rather another new eventLoop instance 
(say genJobEventLoop) in JobGenerator. This is because the existing 
JobGenerator.eventLoop instance's thread is used to actually drive the Job 
Generation and making that thread do additional tasks will increase latency in 
Streaming. This new 'genJobEventLoop' will handle a GenJobStarted and 
GenJobCompleted event and use those events to fire corresponding events to the 
ListenerBus and gather the generateJob metric for all outputStreams and set it 
in the JobSet 

> Streaming Listener timing metrics should include time spent in JobGenerator's 
> graph.generateJobs
> 
>
> Key: SPARK-14597
> URL: https://issues.apache.org/jira/browse/SPARK-14597
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, Streaming
>Affects Versions: 1.6.1, 2.0.0
>Reporter: Sachin Aggarwal
>Priority: Minor
>
> While looking to tune our streaming application, the piece of info we were 
> looking for was actual processing time per batch. The 
> StreamingListener.onBatchCompleted event provides a BatchInfo object that 
> provided this information. It provides the following data
>  - processingDelay
>  - schedulingDelay
>  - totalDelay
>  - Submission Time
>  The above are essentially calculated from the streaming JobScheduler 
> clocking the processingStartTime and processingEndTime for each JobSet. 
> Another metric available is submissionTime which is when a Jobset was put on 
> the Streaming Scheduler's Queue. 
>  
> So we took processing delay as our actual processing time per batch. However 
> to maintain a stable streaming application, we found that the our batch 
> interval had to be a little less than DOUBLE of the processingDelay metric 
> reported. (We are using a DirectKafkaInputStream). On digging further, we 
> found that processingDelay is only clocking time spent in the ForEachRDD 
> closure of the Streaming application and that JobGenerator's 
> graph.generateJobs 
> (https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L248)
>  method takes a significant more amount of time.
>  Thus a true reflection of processing time is
>  a - Time spent in JobGenerator's Job Queue (JobGeneratorQueueDelay)
>  b - Time spent in JobGenerator's graph.generateJobs (JobSetCreationDelay)
>  c - Time spent in JobScheduler Queue for a Jobset (existing schedulingDelay 
> metric)
>  d - Time spent in Jobset's job run (existing processingDelay metric)
>  
>  Additionally a JobGeneratorQueue delay (#a) could be due to either 
> graph.generateJobs taking longer than batchInterval or other JobGenerator 
> events like checkpointing adding up time. Thus it would be beneficial to 
> report time taken by the checkpointing Job as well



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13650) Usage of the window() function on DStream

2016-03-03 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15179347#comment-15179347
 ] 

Mario Briggs commented on SPARK-13650:
--

Running locally on my MAC, here is the output

{quote}
[Stage 0:>  (0 + 0) / 
2]16/03/04 10:38:18 INFO VerifiableProperties: Verifying properties
16/03/04 10:38:18 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
16/03/04 10:38:18 INFO VerifiableProperties: Property group.id is overridden to 
16/03/04 10:38:18 INFO VerifiableProperties: Property zookeeper.connect is 
overridden to 

---
Time: 1457068098000 ms
---
10


[Stage 6:===>   (2 + 0) / 3]

[Stage 6:===>   (2 + 0) / 3]

{quote}

The '10' is the output from the first batch interval at time '1457068098000 ms' 
. Thereafter, the only output for the next ~2 minutes is the 'stage 6'. A ^C 
fails to stop the app and need to do a 'kill -9 pid'

> Usage of the window() function on DStream
> -
>
> Key: SPARK-13650
> URL: https://issues.apache.org/jira/browse/SPARK-13650
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2, 1.6.0, 2.0.0
>Reporter: Mario Briggs
>Priority: Minor
>
> Is there some guidance of the usage of the Window() function on DStream. Here 
> is my academic use-case for which it fails.
> Standard word count
>  val ssc = new StreamingContext(sparkConf, Seconds(6))
>  val messages = KafkaUtils.createDirectStream(...)
>  val words = messages.map(_._2).flatMap(_.split(" "))
>  val window = words.window(Seconds(12), Seconds(6)) 
>  window.count().print()
> For the first batch interval it gives the count and then it hangs (inside the 
> unionRDD)
> I say the above use-case is academic since one can achieve similar 
> fuctionality by using instead the more compact API
>words.countByWindow(Seconds(12), Seconds(6))
> which works fine. 
> Is the first approach above not the intended way of using the .window() API



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13650) Usage of the window() function on DStream

2016-03-03 Thread Mario Briggs (JIRA)
Mario Briggs created SPARK-13650:


 Summary: Usage of the window() function on DStream
 Key: SPARK-13650
 URL: https://issues.apache.org/jira/browse/SPARK-13650
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.6.0, 1.5.2, 2.0.0
Reporter: Mario Briggs
Priority: Minor


Is there some guidance of the usage of the Window() function on DStream. Here 
is my academic use-case for which it fails.

Standard word count

 val ssc = new StreamingContext(sparkConf, Seconds(6))
 val messages = KafkaUtils.createDirectStream(...)
 val words = messages.map(_._2).flatMap(_.split(" "))
 val window = words.window(Seconds(12), Seconds(6)) 
 window.count().print()

For the first batch interval it gives the count and then it hangs (inside the 
unionRDD)

I say the above use-case is academic since one can achieve similar fuctionality 
by using instead the more compact API
   words.countByWindow(Seconds(12), Seconds(6))
which works fine. 

Is the first approach above not the intended way of using the .window() API




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13027) Add API for updateStateByKey to provide batch time as input

2016-02-04 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133643#comment-15133643
 ] 

Mario Briggs commented on SPARK-13027:
--

[~rameshaaditya117] sure, more than happy to review

> Add API for updateStateByKey to provide batch time as input
> ---
>
> Key: SPARK-13027
> URL: https://issues.apache.org/jira/browse/SPARK-13027
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Aaditya Ramesh
>
> The StateDStream currently does not provide the batch time as input to the 
> state update function. This is required in cases where the behavior depends 
> on the batch start time.
> We (Conviva) have been patching it manually for the past several Spark 
> versions but we thought it might be useful for others as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13027) Add API for updateStateByKey to provide batch time as input

2016-02-04 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15132641#comment-15132641
 ] 

Mario Briggs commented on SPARK-13027:
--

[~rameshaaditya117] if you are tied up on something else, i could take a shot 
at it

> Add API for updateStateByKey to provide batch time as input
> ---
>
> Key: SPARK-13027
> URL: https://issues.apache.org/jira/browse/SPARK-13027
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Aaditya Ramesh
>
> The StateDStream currently does not provide the batch time as input to the 
> state update function. This is required in cases where the behavior depends 
> on the batch start time.
> We (Conviva) have been patching it manually for the past several Spark 
> versions but we thought it might be useful for others as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12739) Details of batch in Streaming tab uses two Duration columns

2016-02-01 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15125974#comment-15125974
 ] 

Mario Briggs commented on SPARK-12739:
--

Maybe the 1st duration can be named 'Output Op Duration' and the 2nd named 'Job 
Duration' ?

[~jlaskowski] if you are tied up with something else,  can i give it a shot?

> Details of batch in Streaming tab uses two Duration columns
> ---
>
> Key: SPARK-12739
> URL: https://issues.apache.org/jira/browse/SPARK-12739
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming, Web UI
>Affects Versions: 2.0.0
>Reporter: Jacek Laskowski
>Priority: Minor
> Attachments: SPARK-12739.png
>
>
> "Details of batch" screen in Streaming tab in web UI uses two Duration 
> columns. I think one should be "Processing Time" while the other "Job 
> Duration".
> See the attachment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13009) spark-streaming-twitter_2.10 does not make it possible to access the raw twitter json

2016-02-01 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15125940#comment-15125940
 ] 

Mario Briggs commented on SPARK-13009:
--

Andrew,
 as you yourself noted, i think it is more appropriate that Twitter4J API add 
the method to retrieve the raw JSON, than Spark API go through hoops and 
furthermore this problem exists for all consumers of Twitter4J (nothing 
specific to Spark)

> spark-streaming-twitter_2.10 does not make it possible to access the raw 
> twitter json
> -
>
> Key: SPARK-13009
> URL: https://issues.apache.org/jira/browse/SPARK-13009
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Andrew Davidson
>Priority: Blocker
>  Labels: twitter
>
> The Streaming-twitter package makes it easy for Java programmers to work with 
> twitter. The implementation returns the raw twitter data in JSON formate as a 
> twitter4J StatusJSONImpl object
> JavaDStream tweets = TwitterUtils.createStream(ssc, twitterAuth);
> The status class is different then the raw JSON. I.E. serializing the status 
> object will be the same as the original json. I have down stream systems that 
> can only process raw tweets not twitter4J Status objects. 
> Here is my bug/RFE request made to Twitter4J . 
> They asked  I create a spark tracking issue.
> On Thursday, January 21, 2016 at 6:27:25 PM UTC, Andy Davidson wrote:
> Hi All
> Quick problem summary:
> My system uses the Status objects to do some analysis how ever I need to 
> store the raw JSON. There are other systems that process that data that are 
> not written in Java.
> Currently we are serializing the Status Object. The JSON is going to break 
> down stream systems.
> I am using the Apache Spark Streaming spark-streaming-twitter_2.10  
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources
> Request For Enhancement:
> I imagine easy access to the raw JSON is a common requirement. Would it be 
> possible to add a member function to StatusJSONImpl getRawJson(). By default 
> the returned value would be null unless jsonStoreEnabled=True  is set in the 
> config.
> Alternative implementations:
>  
> It should be possible to modify the spark-streaming-twitter_2.10 to provide 
> this support. The solutions is not very clean
> It would required apache spark to define their own Status Pojo. The current 
> StatusJSONImpl class is marked final
> The Wrapper is not going to work nicely with existing code.
> spark-streaming-twitter_2.10  does not expose all of the twitter streaming 
> API so many developers are writing their implementations of 
> org.apache.park.streaming.twitter.TwitterInputDStream. This make maintenance 
> difficult. Its not easy to know when the spark implementation for twitter has 
> changed. 
> Code listing for 
> spark-1.6.0/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
> private[streaming]
> class TwitterReceiver(
> twitterAuth: Authorization,
> filters: Seq[String],
> storageLevel: StorageLevel
>   ) extends Receiver[Status](storageLevel) with Logging {
>   @volatile private var twitterStream: TwitterStream = _
>   @volatile private var stopped = false
>   def onStart() {
> try {
>   val newTwitterStream = new 
> TwitterStreamFactory().getInstance(twitterAuth)
>   newTwitterStream.addListener(new StatusListener {
> def onStatus(status: Status): Unit = {
>   store(status)
> }
> Ref: 
> https://forum.processing.org/one/topic/saving-json-data-from-twitter4j.html
> What do people think?
> Kind regards
> Andy
> From:  on behalf of Igor Brigadir 
> 
> Reply-To: 
> Date: Tuesday, January 19, 2016 at 5:55 AM
> To: Twitter4J 
> Subject: Re: [Twitter4J] trouble writing unit test
> Main issue is that the Json object is in the wrong json format.
> eg: "createdAt": 1449775664000 should be "created_at": "Thu Dec 10 19:27:44 
> + 2015", ...
> It looks like the json you have was serialized from a java Status object, 
> which makes json objects different to what you get from the API, 
> TwitterObjectFactory expects json from Twitter (I haven't had any problems 
> using TwitterObjectFactory instead of the Deprecated DataObjectFactory).
> You could "fix" it by matching the keys & values you have with the correct, 
> twitter API json - it should look like the example here: 
> https://dev.twitter.com/rest/reference/get/statuses/show/%3Aid
> But it might be easier to download the tweets again, but this time use 
> TwitterObjectFactory.getRawJSON(status) to get the Original Json 

[jira] [Commented] (SPARK-13009) spark-streaming-twitter_2.10 does not make it possible to access the raw twitter json

2016-02-01 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15127718#comment-15127718
 ] 

Mario Briggs commented on SPARK-13009:
--

IMHO even if the StatusJSONImpl was not final, still should be done in Twitter4J

> spark-streaming-twitter_2.10 does not make it possible to access the raw 
> twitter json
> -
>
> Key: SPARK-13009
> URL: https://issues.apache.org/jira/browse/SPARK-13009
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Andrew Davidson
>Priority: Blocker
>  Labels: twitter
>
> The Streaming-twitter package makes it easy for Java programmers to work with 
> twitter. The implementation returns the raw twitter data in JSON formate as a 
> twitter4J StatusJSONImpl object
> JavaDStream tweets = TwitterUtils.createStream(ssc, twitterAuth);
> The status class is different then the raw JSON. I.E. serializing the status 
> object will be the same as the original json. I have down stream systems that 
> can only process raw tweets not twitter4J Status objects. 
> Here is my bug/RFE request made to Twitter4J . 
> They asked  I create a spark tracking issue.
> On Thursday, January 21, 2016 at 6:27:25 PM UTC, Andy Davidson wrote:
> Hi All
> Quick problem summary:
> My system uses the Status objects to do some analysis how ever I need to 
> store the raw JSON. There are other systems that process that data that are 
> not written in Java.
> Currently we are serializing the Status Object. The JSON is going to break 
> down stream systems.
> I am using the Apache Spark Streaming spark-streaming-twitter_2.10  
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources
> Request For Enhancement:
> I imagine easy access to the raw JSON is a common requirement. Would it be 
> possible to add a member function to StatusJSONImpl getRawJson(). By default 
> the returned value would be null unless jsonStoreEnabled=True  is set in the 
> config.
> Alternative implementations:
>  
> It should be possible to modify the spark-streaming-twitter_2.10 to provide 
> this support. The solutions is not very clean
> It would required apache spark to define their own Status Pojo. The current 
> StatusJSONImpl class is marked final
> The Wrapper is not going to work nicely with existing code.
> spark-streaming-twitter_2.10  does not expose all of the twitter streaming 
> API so many developers are writing their implementations of 
> org.apache.park.streaming.twitter.TwitterInputDStream. This make maintenance 
> difficult. Its not easy to know when the spark implementation for twitter has 
> changed. 
> Code listing for 
> spark-1.6.0/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
> private[streaming]
> class TwitterReceiver(
> twitterAuth: Authorization,
> filters: Seq[String],
> storageLevel: StorageLevel
>   ) extends Receiver[Status](storageLevel) with Logging {
>   @volatile private var twitterStream: TwitterStream = _
>   @volatile private var stopped = false
>   def onStart() {
> try {
>   val newTwitterStream = new 
> TwitterStreamFactory().getInstance(twitterAuth)
>   newTwitterStream.addListener(new StatusListener {
> def onStatus(status: Status): Unit = {
>   store(status)
> }
> Ref: 
> https://forum.processing.org/one/topic/saving-json-data-from-twitter4j.html
> What do people think?
> Kind regards
> Andy
> From:  on behalf of Igor Brigadir 
> 
> Reply-To: 
> Date: Tuesday, January 19, 2016 at 5:55 AM
> To: Twitter4J 
> Subject: Re: [Twitter4J] trouble writing unit test
> Main issue is that the Json object is in the wrong json format.
> eg: "createdAt": 1449775664000 should be "created_at": "Thu Dec 10 19:27:44 
> + 2015", ...
> It looks like the json you have was serialized from a java Status object, 
> which makes json objects different to what you get from the API, 
> TwitterObjectFactory expects json from Twitter (I haven't had any problems 
> using TwitterObjectFactory instead of the Deprecated DataObjectFactory).
> You could "fix" it by matching the keys & values you have with the correct, 
> twitter API json - it should look like the example here: 
> https://dev.twitter.com/rest/reference/get/statuses/show/%3Aid
> But it might be easier to download the tweets again, but this time use 
> TwitterObjectFactory.getRawJSON(status) to get the Original Json from the 
> Twitter API, and save that for later. (You must have jsonStoreEnabled=True in 
> your config, and call getRawJSON in the same thread as .showStatus() or 
> 

[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-01-21 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110994#comment-15110994
 ] 

Mario Briggs commented on SPARK-12177:
--

bq. If one uses the kafka v9 jar even when using the old consumer API, it can 
only work a Kafka v9 broker. 

I tried it on a single system setup (v0.9 client talking to v0.8 server-side) 
and the consumers had a problem (old or new). The producers though worked fine. 
So you are right. So then we will have kafka-assembly and 
kafka-assembly-v09/new and each including their version of kafka jars 
respectively right? ( I guess now, you were all along thinking 2 diff 
assemblies, and i guessed the other way round. Duh, IRC might have been faster)

With the above confirmed, it automatically throws out 'The public API 
signatures (of KafkaUtils in v0.9 subproject) are different and do not clash 
(with KafkaUtils in original kafka subproject) and hence can be added to the 
existing (original kafka subproject) KafkaUtils class.’ 

So the only thing left it seems is  to use 'new' or a better term instead of 
'v09', since we both agree on that. Great and thanks Mark.

How's the 'python/pyspark/streaming/kafka-v09(new).py' going

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-01-20 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109056#comment-15109056
 ] 

Mario Briggs commented on SPARK-12177:
--

bq. I totally understand what you mean. However, kafka has its own assembly in 
Spark and the way the code is structured right now, both the new API and old 
API would go in the same assembly so it's important to have a different package 
name. Also, I think for our end users transitioning from old to new API, I 
foresee them having 2 versions of their spark-kafka app. One that works with 
the old API and one with the new API. And, I think it would be an easier 
transition if they could include both the kafka API versions in the spark 
classpath and pick and choose which app to run without mucking with maven 
dependencies and re-compiling when they want to switch. Let me know if you 
disagree.

Since this is WIP, i myself have atleast 1 more different option to what i 
suggested above... put just one to get the conversation rolling, so thanks for 
chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar 
does include all dependencies too, so we would include kafka's v0.9’s jars i 
guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side 
involves upgrading the brokers… i think that is not required when client uses a 
v0.9 jar though consuming only the older high level/low level API and talking 
to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my 
suggestion above itself is broken since we would have issues of same package & 
class names.

1 thought around not introducing the version in the package name or class name 
(I see that Flink does it in the class name) was to avoid forcing us to create 
v0.10/v0.11 packages (and customers to change code and recompile), even if 
those releases of kafka don’t have client-api’ or otherwise such changes that 
warrant us to make a new version (Also spark got away without putting a 
version# till now, which means less work in Spark, so not sure we want to start 
forcing this work going forward). Once we introduce the version #, we need to 
ensure it is in sync with kafka.

That’s why 1 earlier idea i mentioned in this JIRA was 'The public API 
signatures (of KafkaUtils in v0.9 subproject) are different and do not clash 
(with KafkaUtils in original kafka subproject) and hence can be added to the 
existing (original kafka subproject) KafkaUtils class.’  Cody mentioned that we 
need to get others on the same page for this idea, so i guess we really need 
the committers to chime in here. Of course i forgot to answer’s Nikita’s 
followup question - 'do you mean that we would change the original KafkaUtils 
by adding new functions for new DirectIS/KafkaRDD but using them from separate 
module with kafka09 classes’ ?  To be clear, these new public methods added to  
original kafka subproject’s ‘KafkaUtils' ,will  make use of 
DirectKafkaInputDStream,KafkaRDD,KafkaRDDPartition,OffsetRange classes that are 
in a new v09 package (internal of course).  In short we don’t have a new 
subproject. (I skipped class KafkaCluster class from the list, becuase i am 
thinking it makes more sense to call this class something like 'KafkaClient' 
instead going forward)

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-01-20 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109056#comment-15109056
 ] 

Mario Briggs edited comment on SPARK-12177 at 1/20/16 6:16 PM:
---

bq. I totally understand what you mean. However, kafka has its own assembly in 
Spark and the way the code is structured right now, both the new API and old 
API would go in the same assembly so it's important to have a different package 
name. Also, I think for our end users transitioning from old to new API, I 
foresee them having 2 versions of their spark-kafka app. One that works with 
the old API and one with the new API. And, I think it would be an easier 
transition if they could include both the kafka API versions in the spark 
classpath and pick and choose which app to run without mucking with maven 
dependencies and re-compiling when they want to switch. Let me know if you 
disagree.

Hey Mark,
Since this is WIP, i myself have atleast 1 more different option to what i 
suggested above... put just one to get the conversation rolling, so thanks for 
chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar 
does include all dependencies too, so we would include kafka's v0.9’s jars i 
guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side 
involves upgrading the brokers… i think that is not required when client uses a 
v0.9 jar though consuming only the older high level/low level API and talking 
to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my 
suggestion above itself is broken since we would have issues of same package & 
class names.

1 thought around not introducing the version in the package name or class name 
(I see that Flink does it in the class name) was to avoid forcing us to create 
v0.10/v0.11 packages (and customers to change code and recompile), even if 
those releases of kafka don’t have client-api’ or otherwise such changes that 
warrant us to make a new version (Also spark got away without putting a 
version# till now, which means less work in Spark, so not sure we want to start 
forcing this work going forward). Once we introduce the version #, we need to 
ensure it is in sync with kafka.

That’s why 1 earlier idea i mentioned in this JIRA was 'The public API 
signatures (of KafkaUtils in v0.9 subproject) are different and do not clash 
(with KafkaUtils in original kafka subproject) and hence can be added to the 
existing (original kafka subproject) KafkaUtils class.’  This also addresses 
the issues u mention above. Cody mentioned that we need to get others on the 
same page for this idea, so i guess we really need the committers to chime in 
here. Of course i forgot to answer’s Nikita’s followup question - 'do you mean 
that we would change the original KafkaUtils by adding new functions for new 
DirectIS/KafkaRDD but using them from separate module with kafka09 classes’ ?  
To be clear, these new public methods added to  original kafka subproject’s 
‘KafkaUtils' ,will  make use of 
DirectKafkaInputDStream,KafkaRDD,KafkaRDDPartition,OffsetRange classes that are 
in a new v09 package (internal of course).  In short we don’t have a new 
subproject. (I skipped class KafkaCluster class from the list, becuase i am 
thinking it makes more sense to call this class something like 'KafkaClient' 
instead going forward)


was (Author: mariobriggs):
bq. I totally understand what you mean. However, kafka has its own assembly in 
Spark and the way the code is structured right now, both the new API and old 
API would go in the same assembly so it's important to have a different package 
name. Also, I think for our end users transitioning from old to new API, I 
foresee them having 2 versions of their spark-kafka app. One that works with 
the old API and one with the new API. And, I think it would be an easier 
transition if they could include both the kafka API versions in the spark 
classpath and pick and choose which app to run without mucking with maven 
dependencies and re-compiling when they want to switch. Let me know if you 
disagree.

Since this is WIP, i myself have atleast 1 more different option to what i 
suggested above... put just one to get the conversation rolling, so thanks for 
chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar 
does include all dependencies too, so we would include kafka's v0.9’s jars i 
guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side 
involves upgrading the brokers… i think that is not required when client uses a 
v0.9 jar though consuming only the older high level/low level API and talking 
to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my 
suggestion above itself is broken since we would have issues of same package & 
class names.

1 thought around not introducing the version in the package name or class name 
(I see that Flink does 

[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-01-18 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105560#comment-15105560
 ] 

Mario Briggs commented on SPARK-12177:
--

Hi Nikita,
 great. 

 1 - We should also have a python/pyspark/streaming/kafka-v09.py as well that 
matches to our external/kafka-v09
 2 - Why do you have the Broker.scala class? Unless i am missing something, it 
should be knocked off
 3 - I think the package should be 'org.apache.spark.streaming.kafka' only in 
external/kafka-v09 and not 'org.apache.spark.streaming.kafka.v09'. This is 
because we produce a jar with a diff name (user picks which one and even if 
he/she mismatches, it errors correctly since the KafkaUtils method signatures 
are different)
  
 

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-01-05 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15082674#comment-15082674
 ] 

Mario Briggs commented on SPARK-12177:
--

implemented here - 
https://github.com/mariobriggs/spark/commit/2fcbb721b99b48e336ba7ef7c317c279c9483840

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-01-04 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15081565#comment-15081565
 ] 

Mario Briggs commented on SPARK-12177:
--

Confirmed that the frequent KafkaConsumer object creation was reason for the 
consumer.position() method hang. Cleaned up frequent KafkaConsumer object 
creation 
https://github.com/mariobriggs/spark/commit/e40df7ee70fd72418969e8f9c81a1fee304b8b1c
 and it resolved issue

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-30 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075230#comment-15075230
 ] 

Mario Briggs commented on SPARK-12177:
--

you could also get just a few of the records you want i.e. not all in 1 shot

override def getNext(): R = {
  if (iter == null || !iter.hasNext) {
iter = consumer.poll(pollTime).iterator()
  }

  if (!iter.hasNext) {
if ( requestOffset < part.untilOffset ) {
   // need to make another poll() and recheck above. So make a 
recursive call i.e. 'return getnext()' here ?
}
finished = true
null.asInstanceOf[R]
  } else {
   ...

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-30 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075172#comment-15075172
 ] 

Mario Briggs commented on SPARK-12177:
--

Nikita,

thank you. 

A-C : Looks good to me. (BTW i didn't review changes related to receiver based 
approach, even in earlier round)

D - I think it is OK for KafkaTestUtils to have dependency on core, since that 
is more of our internal test approach (however i havent spent time to think if 
even that can be bettered). To the higher issue, i think Kafka *will* provide 
TopicPartition as serializable, which will make this moot, but good that we 
have tracked it here

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-30 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075205#comment-15075205
 ] 

Mario Briggs commented on SPARK-12177:
--

Very good point about creation of KafkaConsumer frequently. In fact, Praveen is 
investigating if that is reason the 'position()' method hangs when we have 
batch intervals at 200ms and below.
 So one way to try to optimize it is this way : since the 'compute' method in 
DirectKafkaInputDStream runs in the driver, why not store the 'KafkaConsumer' 
rather than the KafkaCluster as a member variable in this class. Of course we 
will need to mark it transient, so that its not attempted to be serialized and 
that means always check if null and re-initialize if required, before use. The 
only use of the Consumer here is to find the new latest offsets, so we will 
have to massage that method for use with an existing consumer object .
Or another option is to let KafkaCluster have a KafkaConsumer instance as a 
member variable with same noted aspects about being transient.

This also means, move the part about fetching the leader ipAddress for 
getPreferredLocations() away from KafkaRDD.getPartitions() to 
DirectKafkaInputDStream.compute() and have 'leaders' as constructor param to 
KafkaRDD ( i now realize that KafkaRDD is private so we are not having that on 
a public API as i thought earlier)
  

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-30 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075230#comment-15075230
 ] 

Mario Briggs edited comment on SPARK-12177 at 12/30/15 5:24 PM:


you could also get just a few of the records you want i.e. not all in 1 shot. 
So a gist below

override def getNext(): R = {
  if (iter == null || !iter.hasNext) {
iter = consumer.poll(pollTime).iterator()
  }

  if (!iter.hasNext) {
if ( requestOffset < part.untilOffset ) {

   // need to make another poll() and recheck above. So make a 
recursive call i.e. 'return getnext()' here ?

}
finished = true
null.asInstanceOf[R]
  } else {
   ...


was (Author: mariobriggs):
you could also get just a few of the records you want i.e. not all in 1 shot

override def getNext(): R = {
  if (iter == null || !iter.hasNext) {
iter = consumer.poll(pollTime).iterator()
  }

  if (!iter.hasNext) {
if ( requestOffset < part.untilOffset ) {
   // need to make another poll() and recheck above. So make a 
recursive call i.e. 'return getnext()' here ?
}
finished = true
null.asInstanceOf[R]
  } else {
   ...

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-29 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15074131#comment-15074131
 ] 

Mario Briggs commented on SPARK-12177:
--

with regards to review comment 'C' (2 comments above), the kafka folks have 
clarified that the timeout parameter on the poll() method is the time to spend 
waiting till the client side gets the data (from the server) and not time to 
spend waiting for data to become available on the server. This means that one 
might have to call poll() more than once, till we get to the consumerRecord of 
'untilOffset' 

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-28 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073035#comment-15073035
 ] 

Mario Briggs commented on SPARK-12177:
--

On the issue of lots of duplicate code from the original, 1 thought was whether 
we need to upgrade the older Receiver based approach to the new Consumer API? 
The Direct approach has so many benefits over the older Receiver based approach 
and i can't think of a drawback, that one might make the argument that we dont 
upgrade the latter at all, it remains on the older kafka consumer API and get 
deprecated over a long period time. Thoughts ?

If we do go the above way, then there is very trivial overlap of code between 
original and this new consumer implementation. The public API signatures are 
different and do not clash and hence can be added to the existing KafkaUtils 
class.

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-28 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073020#comment-15073020
 ] 

Mario Briggs edited comment on SPARK-12177 at 12/28/15 7:02 PM:


  Hi Nikita,
 thanks. Here are my review comments. I couldnt find a way to add them on the 
PR review in github, so added them here. My comments are a little detailed, 
since i too developed an implementation and tested it along with my colleague 
Praveen :-) after initial discussion on dev list 

A - KafkaCluster class
 getPartitions()
 seek()
callers of the above methods
all other methods that use withConsumer()

 These should not return a ‘Either’, but rather just the expected object ( the 
‘Right’). The reason for the ‘Either’ object in the earlier code was due to the 
fact the earlier kafka client had to deal with trying the operation on all the 
‘seedBrokers’ and handle the case if some of them were down. Similarly when 
dealing with ‘leaders’, client had to try the operation on all leaders for a TP 
(TopicAndPartition).  When we use the new kaka-clients API, we don’t have to 
deal with trying against all the seedBrokers, leaders etc, since the new 
KafkaConsumer object internally handles all those details.
Notice that in the earlier code, withBrokers() tries to connect() and invoke 
the passed in function multiple times with the brokers.forEach() and hence the 
need to accumulate errors. The earlier code also did a ‘return’ immediately 
when successful with one of the brokers. This does not apply with the new 
KafkaConsumer object.

getPartitions() - 
https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62
  consumer.partitionsFor() java API will returns a null if the topic doesn’t 
exist. If you don’t handle that, you run into a NPE when the user specifies a 
topic that doesn’t exist or makes a typo in the topic name (also not returning 
an exception saying the partition doesn’t exist is not right)

our implementation is at - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala.
 If it is easier for you that we issue a PR to your repo, let us know

B - KafkaRDD class
   getPreferredLocations()
  this method is missing in your code. The earlier implementation from Cody had 
the optimisation that if Kafka and the spark code (KafkaRDD) was running on the 
same cluster, then the RDD partition for a particular TopicPartition, would be 
local to that TopicPartition leader. Could you please add code to bring back 
this functionality.

 Our implementation, pulled this info inside the getPartitions- 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52.
 Probably more efficient to do it inside compute() of the DStream, but that 
meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - 
https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E

C - KafkaRDDIterator class

 getNext()
  As mentioned in issue #1 noted here - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api
,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small 
value. We are following up this issue with kafka - 
https://issues.apache.org/jira/browse/KAFKA-3044 . I see that you have made 
this a configurable value in your implementation which is good, but either ways 
till this behaviour is clarified or even otherwise, we need this assert 
-https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171
 or else we will be silently skipping data without the user knowing it (either 
default value or user specifying a smaller value)

D- Non use of TopicPartition class of new Consumer

   You already have figured out that this class is not Serializable and hence 
in the public interface you have used the older TopicAndPartition class. We 
have raised this issue https://issues.apache.org/jira/browse/KAFKA-3029 with 
Kafka and maybe be provided with one (yet to see). However using the older 
TopicAndPartition class in our public API, which introduces a dependency on the 
older kafka core rather than just kaka-clients jar, i would think is not the 
preferred approach. If we are not provided with a serializable TopicPartition, 
then we should rather use our own serializable object (or just a tuple of 
string, int, Long) inside of DirectKafkaInputDStream to ensure it is 
Serializable.



was (Author: mariobriggs):
  Hi Nikita,
 thanks. Here are my review 

[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-28 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073020#comment-15073020
 ] 

Mario Briggs commented on SPARK-12177:
--

  Hi Nikita,
 thanks. Here are my review comments. I couldnt find a way to add them on the 
PR review in github, so added them here. My comments are a little detailed, 
since i too developed an implementation and tested it along with my colleague 
Praveen :-) after initial discussion on dev list 

A - KafkaCluster class
 getPartitions()
 seek()
callers of the above methods
all other methods that use withConsumer()

 These should not return a ‘Either’, but rather just the expected object ( the 
‘Right’). The reason for the ‘Either’ object in the earlier code was due to the 
fact the earlier kafka client had to deal with trying the operation on all the 
‘seedBrokers’ and handle the case if some of them were down. Similarly when 
dealing with ‘leaders’, client had to try the operation on all leaders for a TP 
(TopicAndPartition).  When we use the new kaka-clients API, we don’t have to 
deal with trying against all the seedBrokers, leaders etc, since the new 
KafkaConsumer object internally handles all those details.
Notice that in the earlier code, withBrokers() tries to connect() and invoke 
the passed in function multiple times with the brokers.forEach() and hence the 
need to accumulate errors. The earlier code also did a ‘return’ immediately 
when successful with one of the brokers. This does not apply with the new 
KafkaConsumer object.

getPartitions() - 
https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62
  consumer.partitionsFor() java API will returns a null if the topic doesn’t 
exist. If you don’t handle that, you run into a NPE when the user specifies a 
topic that doesn’t exist or makes a typo in the topic name (also not returning 
an exception saying the partition doesn’t exist is not right)

our implementation is at - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala.
 If it is easier for you that we issue a PR to your repo, let us know

B - KafkaRDD class
   getPreferredLocations()
  this method is missing in your code. The earlier implementation from Cody had 
the optimisation that if Kafka and the spark code (KafkaRDD) was running on the 
same cluster, then the RDD partition for a particular TopicPartition, would be 
local to that TopicPartition leader. Could you please add code to bring back 
this functionality.

 Our implementation, pulled this info inside the getPartitions- 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52.
 Probably more efficient to do it inside compute() of the DStream, but that 
meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - 
https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E

C - KafkaRDDIterator class

 getNext()
  As mentioned in issue #1 noted here - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api
,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small 
value. We are following up with [this issue with 
kafka](https://issues.apache.org/jira/browse/KAFKA-3044.I see that you have 
made this a configurable value in your implementation which is good, but either 
ways till this behaviour is clarified or even otherwise, we need [this 
assert](https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171)
 or else we will be silently skipping data without the user knowing it (either 
default value or user specifying a smaller value)

D- Non use of TopicPartition class of new Consumer

   You already have figured out that this class is not Serializable and hence 
in the public interface you have used the older TopicAndPartition class. We 
have raised [this issue](https://issues.apache.org/jira/browse/KAFKA-3029) with 
Kafka and maybe be provided with one (yet to see). However using the older 
TopicAndPartition class in our public API, which introduces a dependency on the 
older kafka core rather than just kaka-clients jar, i would think is not the 
preferred approach. If we are not provided with a serializable TopicPartition, 
then we should rather use our own serializable object (or just a tuple of 
string, int, Long) inside of DirectKafkaInputDStream to ensure it is 
Serializable.


> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
>