[jira] [Comment Edited] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-19 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15679472#comment-15679472
 ] 

Cody Koeninger edited comment on SPARK-18475 at 11/19/16 4:02 PM:
--

Yes, an RDD does have an ordering guarantee, it's an iterator per partition, 
same as Kafka.  Yes, that guarantee is part of the Kafka data model (Burak, if 
you don't believe me, go reread 
http://kafka.apache.org/documentation.html#introduction  search for "order").  
Because the direct stream (and the structured stream that uses the same model) 
has a 1:1 correspondence between kafka partition and spark partition, that 
guarantee is preserved.  The existing distortions between the Kafka model and 
the direct stream / structured stream are enough as it is, we don't need to add 
more.



was (Author: c...@koeninger.org):
Yes, an RDD does have an ordering guarantee, it's an iterator per partition, 
same as Kafka.  Yes, that guarantee is part of the Kafka data model (Burak, if 
you don't believe me, go reread 
http://kafka.apache.org/documentation.html#introduction  search for "order").  
Because the direct stream (and the structured stream that uses the same model) 
has a 1:! correspondence between kafka partition and spark partition, that 
guarantee is preserved.  The existing distortions between the Kafka model and 
the direct stream / structured stream are enough as it is, we don't need to add 
more.


> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-19 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15679472#comment-15679472
 ] 

Cody Koeninger commented on SPARK-18475:


Yes, an RDD does have an ordering guarantee, it's an iterator per partition, 
same as Kafka.  Yes, that guarantee is part of the Kafka data model (Burak, if 
you don't believe me, go reread 
http://kafka.apache.org/documentation.html#introduction  search for "order").  
Because the direct stream (and the structured stream that uses the same model) 
has a 1:! correspondence between kafka partition and spark partition, that 
guarantee is preserved.  The existing distortions between the Kafka model and 
the direct stream / structured stream are enough as it is, we don't need to add 
more.


> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674459#comment-15674459
 ] 

Cody Koeninger commented on SPARK-18475:


This has come up several times, and my answer is consistently the same - as 
Ofir said, the Kafka model is parallelism bounded by number of partitions.  
Breaking that model breaks user expectations, e.g. concerning ordering.  It's 
fine for you if this helps your specific use case, but I think it is not 
appropriate for general use.  I'd recommend people fix their skew and/or 
repartition at the producer level.  

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18386) Batch mode SQL source for Kafka

2016-11-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654308#comment-15654308
 ] 

Cody Koeninger commented on SPARK-18386:


That should work.  There may be dependency conflicts trying to put a 0.10.1 jar 
in the same job as a 0.10.0, though.

> Batch mode SQL source for Kafka
> ---
>
> Key: SPARK-18386
> URL: https://issues.apache.org/jira/browse/SPARK-18386
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Cody Koeninger
>
> An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for 
> querying over a defined batch of offsets.
> The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X 
> to timestamp Y) should be taken into account, even if not available in the 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0

2016-11-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654295#comment-15654295
 ] 

Cody Koeninger commented on SPARK-18057:


I definitely do not want another copy-paste situation, we've already got too 
many of them.

I'm hoping that 0.10.1 is close enough to 0.10.0 that dependency issues can be 
worked out in a more satisfactory way (e.g. kafka is marked as provided, the 
0.10.1 integration jar depends on the 0.10 integration jar and just adds 
methods for time indexing) but I haven't really had time to look at it.

> Update structured streaming kafka from 10.0.1 to 10.1.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18386) Batch mode SQL source for Kafka

2016-11-09 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18386:
--

 Summary: Batch mode SQL source for Kafka
 Key: SPARK-18386
 URL: https://issues.apache.org/jira/browse/SPARK-18386
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Reporter: Cody Koeninger


An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for 
querying over a defined batch of offsets.

The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X to 
timestamp Y) should be taken into account, even if not available in the initial 
implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18371) Spark Streaming backpressure bug - generates a batch with large number of records

2016-11-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649638#comment-15649638
 ] 

Cody Koeninger commented on SPARK-18371:


Thanks for digging into this.  The other thing I noticed when working on

https://github.com/apache/spark/pull/15132

is that the return value of getLatestRate was cast to Int, which seems wrong 
and possibly subject to overflow.

If you have the ability to test that PR (shouldn't require a spark redeploy, 
since the kafka jar is standalone), may want to test it out.

> Spark Streaming backpressure bug - generates a batch with large number of 
> records
> -
>
> Key: SPARK-18371
> URL: https://issues.apache.org/jira/browse/SPARK-18371
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0
>Reporter: mapreduced
> Attachments: GiantBatch2.png, GiantBatch3.png, 
> Giant_batch_at_23_00.png, Look_at_batch_at_22_14.png
>
>
> When the streaming job is configured with backpressureEnabled=true, it 
> generates a GIANT batch of records if the processing time + scheduled delay 
> is (much) larger than batchDuration. This creates a backlog of records like 
> no other and results in batches queueing for hours until it chews through 
> this giant batch.
> Expectation is that it should reduce the number of records per batch in some 
> time to whatever it can really process.
> Attaching some screen shots where it seems that this issue is quite easily 
> reproducible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18258) Sinks need access to offset representation

2016-11-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15638555#comment-15638555
 ] 

Cody Koeninger commented on SPARK-18258:


Sure, added, let me know if I'm missing something or can clarify.

> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for 
> this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
> OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using 
> committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back 
> to the correct source when restarting complicated multi-source jobs, but I 
> think it'd be sufficient.  Passing the string/json representation of the seq 
> instead of the seq itself would probably be sufficient as well, but the 
> convention of rendering a None as "-" in the json is maybe a little 
> idiosyncratic to parse, and the constant defining that is private.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18258) Sinks need access to offset representation

2016-11-04 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-18258:
---
Description: 
Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the 
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.

After SPARK-17829 is complete and offsets have a .json method, an api for this 
ticket might look like

{code}
trait Sink {
  def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
OffsetSeq): Unit
{code}

where start and end were provided by StreamExecution.runBatch using 
committedOffsets and availableOffsets.  

I'm not 100% certain that the offsets in the seq could always be mapped back to 
the correct source when restarting complicated multi-source jobs, but I think 
it'd be sufficient.  Passing the string/json representation of the seq instead 
of the seq itself would probably be sufficient as well, but the convention of 
rendering a None as "-" in the json is maybe a little idiosyncratic to parse, 
and the constant defining that is private.

  was:
Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the 
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.


> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for 
> this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
> OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using 
> committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back 
> to the correct source when restarting complicated multi-source jobs, but I 
> think it'd be sufficient.  Passing the string/json representation of the seq 
> instead of the seq itself would probably be sufficient as well, but the 
> convention of rendering a None as "-" in the json is maybe a little 
> idiosyncratic to parse, and the constant defining that is private.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---

[jira] [Commented] (SPARK-18258) Sinks need access to offset representation

2016-11-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15637621#comment-15637621
 ] 

Cody Koeninger commented on SPARK-18258:


So one obvious one is that if wherever checkpoint data is being stored fails or 
is corrupted, my downstream database can still be fine and have correct 
results, yet I have no way of restarting the job from a known point because the 
batch id stored in the database is now meaningless.

Basically, I do not want to introduce another N points of failure in between 
Kafka and my data store.

> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18258) Sinks need access to offset representation

2016-11-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15637576#comment-15637576
 ] 

Cody Koeninger commented on SPARK-18258:


The sink doesn't have to reason about equality of the representations.

It just has to be able to store those representations, in addition the batch id 
if necessary, so that the job can be recovered if spark fails in a way that 
renders the batch id meaningless or the user wants to switch to a different 
streaming system.

> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18272) Test topic addition for subscribePattern on Kafka DStream and Structured Stream

2016-11-04 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18272:
--

 Summary: Test topic addition for subscribePattern on Kafka DStream 
and Structured Stream
 Key: SPARK-18272
 URL: https://issues.apache.org/jira/browse/SPARK-18272
 Project: Spark
  Issue Type: Bug
  Components: DStreams, Structured Streaming
Reporter: Cody Koeninger


We've had reports of the following sequence

- create subscribePattern stream that doesn't match any existing topics at the 
time stream starts
- add a topic that matches pattern
- expect that messages from that topic show up, but they don't

We don't seem to actually have tests that cover this case, so we should add them



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18258) Sinks need access to offset representation

2016-11-03 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-18258:
---
Description: 
Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the 
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.

  was:
Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.


> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18258) Sinks need access to offset representation

2016-11-03 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18258:
--

 Summary: Sinks need access to offset representation
 Key: SPARK-18258
 URL: https://issues.apache.org/jira/browse/SPARK-18258
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Reporter: Cody Koeninger


Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17938) Backpressure rate not adjusting

2016-11-02 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15629332#comment-15629332
 ] 

Cody Koeninger commented on SPARK-17938:


Direct stream isn't a receiver, receiver settings don't apply to it.

> Backpressure rate not adjusting
> ---
>
> Key: SPARK-17938
> URL: https://issues.apache.org/jira/browse/SPARK-17938
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Samy Dindane
>
> spark-streaming 2.0.1 and spark-streaming-kafka-0-10 version is 2.0.1. Same 
> behavior with 2.0.0 though.
> spark.streaming.kafka.consumer.poll.ms is set to 3
> spark.streaming.kafka.maxRatePerPartition is set to 10
> spark.streaming.backpressure.enabled is set to true
> `batchDuration` of the streaming context is set to 1 second.
> I consume a Kafka topic using KafkaUtils.createDirectStream().
> My system can handle 100k records batches, but it'd take more than 1 seconds 
> to process them all. I'd thus expect the backpressure to reduce the number of 
> records that would be fetched in the next batch to keep the processing delay 
> inferior to 1 second.
> Only this does not happen and the rate of the backpressure stays the same: 
> stuck in `100.0`, no matter how the other variables change (processing time, 
> error, etc.).
> Here's a log showing how all these variables change but the chosen rate stays 
> the same: https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba (I 
> would have attached a file but I don't see how).
> Is this the expected behavior and I am missing something, or is this  a bug?
> I'll gladly help by providing more information or writing code if necessary.
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18212) Flaky test: org.apache.spark.sql.kafka010.KafkaSourceSuite.assign from specific offsets

2016-11-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15627838#comment-15627838
 ] 

Cody Koeninger commented on SPARK-18212:


So here's a heavily excerpted version of what I see happening in that log:

{code}
16/11/01 14:08:46.593 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaTestUtils:   Sent 34 to partition 2, offset 3
16/11/01 14:08:46.593 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaProducer: Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms.
16/11/01 14:08:46.596 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaTestUtils: Created consumer to get latest offsets


16/11/01 14:08:47.833 Executor task launch worker-2 ERROR Executor: Exception 
in task 1.0 in stage 29.0 (TID 142)
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-kafka-source-a9485cc4-c83d-4e97-a20e-3960565b3fdb-335403166-execut\
or topic-5-2 3 after polling for 512


16/11/01 14:08:49.252 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaTestUtils: Closed consumer to get latest offsets
16/11/01 14:08:49.252 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaSourceSuite: Added data, expected offset [(topic-5-0,4), (topic-5-1,4), 
(topic-5-2,4), (topic-5-3,4), (topic-5-4,4)]
{code}


We're waiting on the producer's send future for up to 10 seconds; it takes 
almost 3 seconds between when the producer send finishes and the consumer 
that's being used to verify the post-send offsets finishes; but in the meantime 
we're only waiting half a second for executor fetches.

It's really ugly, but probably the easiest way to make this less flaky is to 
increase the value of kafkaConsumer.pollTimeoutMs to the same order of 
magnitude being used for the other test waits.

[~zsxwing] unless you see anything else wrong in the log or have a better idea, 
I can put in a pr tomorrow to increase that poll timeout in tests.


> Flaky test: org.apache.spark.sql.kafka010.KafkaSourceSuite.assign from 
> specific offsets
> ---
>
> Key: SPARK-18212
> URL: https://issues.apache.org/jira/browse/SPARK-18212
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Reporter: Davies Liu
>
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.3/1968/testReport/junit/org.apache.spark.sql.kafka010/KafkaSourceSuite/assign_from_specific_offsets/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module

2016-11-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15626774#comment-15626774
 ] 

Cody Koeninger commented on SPARK-17935:


Some other things to think about:
- are there any producer configurations you don't want to support?
- specifically, are you only going to support byte array serializers for 
writing key and value?
- if you're only supporting byte array, how do you clearly document for users 
how to handle their common use case (i.e. I have one string column I want to be 
the key, and the others columns should be json name/value pairs in the message)

> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> --
>
> Key: SPARK-17935
> URL: https://issues.apache.org/jira/browse/SPARK-17935
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add 
> `KafkaForeachWriter` to output results to kafka in structured streaming 
> module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module

2016-10-27 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15612663#comment-15612663
 ] 

Cody Koeninger commented on SPARK-17935:


So the main thing to point out is that Kafka producers currently aren't 
idempotent, so this sink can't be fault-tolerant.

Regarding the design doc, couple of comments

- KafkaSinkRDD  Why is this necessary?  Seems like KafkaSink should do 
basically the same as existing ForeachSink class

- CachedKafkaProducer  Why is this necessary?  A singleton producer per JVM is 
generally what's recommended by kafka docs.




> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> --
>
> Key: SPARK-17935
> URL: https://issues.apache.org/jira/browse/SPARK-17935
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add 
> `KafkaForeachWriter` to output results to kafka in structured streaming 
> module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17829) Stable format for offset log

2016-10-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15610399#comment-15610399
 ] 

Cody Koeninger commented on SPARK-17829:


I'm not telling you to do it that way, just asking if you had considered it.  
General advantage of typeclasses being separating concerns (should all these 
classes need to know about json) and getting inductive definitions for free (if 
you have a serializer for container, you have a serializer for any container of 
nested serializable). If all the stuff you're looking at modifying already 
knows about java serialization it may not be a big deal though.

Specifically about the using a seq instead of array for compactible file 
stream, isn't there an existing warning in the code as to why that's using an 
array, due to pathological behavior on large linked lists?

> Stable format for offset log
> 
>
> Key: SPARK-17829
> URL: https://issues.apache.org/jira/browse/SPARK-17829
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Tyson Condie
>
> Currently we use java serialization for the WAL that stores the offsets 
> contained in each batch.  This has two main issues:
>  - It can break across spark releases (though this is not the only thing 
> preventing us from upgrading a running query)
>  - It is unnecessarily opaque to the user.
> I'd propose we require offsets to provide a user readable serialization and 
> use that instead.  JSON is probably a good option.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17829) Stable format for offset log

2016-10-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15609653#comment-15609653
 ] 

Cody Koeninger commented on SPARK-17829:


Have you considered using a typeclass?

> Stable format for offset log
> 
>
> Key: SPARK-17829
> URL: https://issues.apache.org/jira/browse/SPARK-17829
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Tyson Condie
>
> Currently we use java serialization for the WAL that stores the offsets 
> contained in each batch.  This has two main issues:
>  - It can break across spark releases (though this is not the only thing 
> preventing us from upgrading a running query)
>  - It is unnecessarily opaque to the user.
> I'd propose we require offsets to provide a user readable serialization and 
> use that instead.  JSON is probably a good option.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0

2016-10-21 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18057:
--

 Summary: Update structured streaming kafka from 10.0.1 to 10.1.0
 Key: SPARK-18057
 URL: https://issues.apache.org/jira/browse/SPARK-18057
 Project: Spark
  Issue Type: Sub-task
Reporter: Cody Koeninger


There are a couple of relevant KIPs here, 
https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18056) Update KafkaDStreams from 10.0.1 to 10.1.0

2016-10-21 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-18056:
---
Description: There are a couple of relevant KIPs here, 
https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html

> Update KafkaDStreams from 10.0.1 to 10.1.0
> --
>
> Key: SPARK-18056
> URL: https://issues.apache.org/jira/browse/SPARK-18056
> Project: Spark
>  Issue Type: Improvement
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18056) Update KafkaDStreams from 10.0.1 to 10.1.0

2016-10-21 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18056:
--

 Summary: Update KafkaDStreams from 10.0.1 to 10.1.0
 Key: SPARK-18056
 URL: https://issues.apache.org/jira/browse/SPARK-18056
 Project: Spark
  Issue Type: Improvement
Reporter: Cody Koeninger






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17829) Stable format for offset log

2016-10-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15593000#comment-15593000
 ] 

Cody Koeninger commented on SPARK-17829:


At least with regard to kafka offsets, it might be good to keep this the same 
format as in SPARK-17812

> Stable format for offset log
> 
>
> Key: SPARK-17829
> URL: https://issues.apache.org/jira/browse/SPARK-17829
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Tyson Condie
>
> Currently we use java serialization for the WAL that stores the offsets 
> contained in each batch.  This has two main issues:
>  - It can break across spark releases (though this is not the only thing 
> preventing us from upgrading a running query)
>  - It is unnecessarily opaque to the user.
> I'd propose we require offsets to provide a user readable serialization and 
> use that instead.  JSON is probably a good option.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18033) Deprecate TaskContext.partitionId

2016-10-20 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18033:
--

 Summary: Deprecate TaskContext.partitionId
 Key: SPARK-18033
 URL: https://issues.apache.org/jira/browse/SPARK-18033
 Project: Spark
  Issue Type: Improvement
Reporter: Cody Koeninger


Mark TaskContext.partitionId as deprecated, because it doesn't always reflect 
the physical index at the time the RDD is created.  Add a 
foreachPartitionWithIndex method to mirror the existing mapPartitionsWithIndex 
method.

For background, see

http://apache-spark-developers-list.1001551.n3.nabble.com/PSA-TaskContext-partitionId-the-actual-logical-partition-index-td19524.html





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2016-10-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586417#comment-15586417
 ] 

Cody Koeninger commented on SPARK-17147:


If that's something you're seeing regularly, probably worth bringing it up on 
the mailing list, with a full stacktrace and whatever background you have

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2016-10-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586397#comment-15586397
 ] 

Cody Koeninger commented on SPARK-17147:


Then no, this issue is unlikely to affect you unless there's something wrong 
with your topic.

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2016-10-18 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17147:
---
Summary: Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive 
Offsets (i.e. Log Compaction)  (was: Spark Streaming Kafka 0.10 Consumer Can't 
Handle Non-consecutive Offsets)

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets

2016-10-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584172#comment-15584172
 ] 

Cody Koeninger commented on SPARK-17147:


Well, are you using compacted topics?

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets
> 
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-15 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578894#comment-15578894
 ] 

Cody Koeninger commented on SPARK-17812:


As you just said yourself, assign doesn't mean you necessarily know the exact 
starting offsets you want.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions
> currently agreed on plan:
> Mutually exclusive subscription options (only assign is new to this ticket)
> {noformat}
> .option("subscribe","topicFoo,topicBar")
> .option("subscribePattern","topic.*")
> .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
> {noformat}
> where assign can only be specified that way, no inline offsets
> Single starting position option with three mutually exclusive types of value
> {noformat}
> .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 
> 1234, "1": -2}, "topicBar":{"0": -1}}""")
> {noformat}
> startingOffsets with json fails if any topicpartition in the assignments 
> doesn't have an offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module

2016-10-15 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578141#comment-15578141
 ] 

Cody Koeninger commented on SPARK-17935:


Why is this in kafka-0-8, when we haven't resolved (for the third time) whether 
we're even continuing to work on 0.8?  Those modules have conflicting 
dependencies as is.

> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> --
>
> Key: SPARK-17935
> URL: https://issues.apache.org/jira/browse/SPARK-17935
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add 
> `KafkaForeachWriter` to output results to kafka in structured streaming 
> module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17938) Backpressure rate not adjusting

2016-10-15 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578133#comment-15578133
 ] 

Cody Koeninger commented on SPARK-17938:


There was pretty extensive discussion of this on list, should link or summarize 
it.

Couple of things here:

 100 is the default minimum rate for pidestimator.  If you're willing to write 
code, put more logging in to determine why that rate isn't being configured, or 
hardcode it to a different number. I have successfully adjusted that rate using 
spark configuration.

The other thing is that if your system takes way longer than 1 second to 
process 100k records, 100k obviously isn't a reasonable max. Many large batches 
will be defined during the time that first batch is running, before back 
pressure is involved at all. Try a lower max.

> Backpressure rate not adjusting
> ---
>
> Key: SPARK-17938
> URL: https://issues.apache.org/jira/browse/SPARK-17938
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Samy Dindane
>
> spark-streaming 2.0.1 and spark-streaming-kafka-0-10 version is 2.0.1. Same 
> behavior with 2.0.0 though.
> spark.streaming.kafka.consumer.poll.ms is set to 3
> spark.streaming.kafka.maxRatePerPartition is set to 10
> spark.streaming.backpressure.enabled is set to true
> `batchDuration` of the streaming context is set to 1 second.
> I consume a Kafka topic using KafkaUtils.createDirectStream().
> My system can handle 100k records batches, but it'd take more than 1 seconds 
> to process them all. I'd thus expect the backpressure to reduce the number of 
> records that would be fetched in the next batch to keep the processing delay 
> inferior to 1 second.
> Only this does not happen and the rate of the backpressure stays the same: 
> stuck in `100.0`, no matter how the other variables change (processing time, 
> error, etc.).
> Here's a log showing how all these variables change but the chosen rate stays 
> the same: https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba (I 
> would have attached a file but I don't see how).
> Is this the expected behavior and I am missing something, or is this  a bug?
> I'll gladly help by providing more information or writing code if necessary.
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17813) Maximum data per trigger

2016-10-14 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15577037#comment-15577037
 ] 

Cody Koeninger commented on SPARK-17813:


To be clear, the current direct stream (and as a result structured stream) 
straight up will not work with compacted topics currently, because of the 
assumption that offset ranges are contiguous.  There's a ticket for it 
SPARK-17147 with a prototype solution, waiting for feedback from a user on it.

So for global maxOffsetsPerTrigger are you saying a spark configuration?  Is 
there a reason not to make that a maxRowsPerTrigger (or messages, or whatever 
name) so that it can potentially be reused by other sources?  I think for this 
a proportional distribution of offsets shouldn't be too hard.  I can pick this 
up once the assign stuff is stabilized.

> Maximum data per trigger
> 
>
> Key: SPARK-17813
> URL: https://issues.apache.org/jira/browse/SPARK-17813
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> At any given point in a streaming query execution, we process all available 
> data.  This maximizes throughput at the cost of latency.  We should add 
> something similar to the {{maxFilesPerTrigger}} option available for files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17812:
---
Description: 
Right now you can only run a Streaming Query starting from either the earliest 
or latests offsets available at the moment the query is started.  Sometimes 
this is a lot of data.  It would be nice to be able to do the following:
 - seek to user specified offsets for manually specified topicpartitions

currently agreed on plan:

Mutually exclusive subscription options (only assign is new to this ticket)
{noformat}
.option("subscribe","topicFoo,topicBar")
.option("subscribePattern","topic.*")
.option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
{noformat}

where assign can only be specified that way, no inline offsets

Single starting position option with three mutually exclusive types of value
{noformat}
.option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 1234, 
"1": -2}, "topicBar":{"0": -1}}""")
{noformat}

startingOffsets with json fails if any topicpartition in the assignments 
doesn't have an offset.


  was:
Right now you can only run a Streaming Query starting from either the earliest 
or latests offsets available at the moment the query is started.  Sometimes 
this is a lot of data.  It would be nice to be able to do the following:
 - seek to user specified offsets for manually specified topicpartitions


> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions
> currently agreed on plan:
> Mutually exclusive subscription options (only assign is new to this ticket)
> {noformat}
> .option("subscribe","topicFoo,topicBar")
> .option("subscribePattern","topic.*")
> .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
> {noformat}
> where assign can only be specified that way, no inline offsets
> Single starting position option with three mutually exclusive types of value
> {noformat}
> .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 
> 1234, "1": -2}, "topicBar":{"0": -1}}""")
> {noformat}
> startingOffsets with json fails if any topicpartition in the assignments 
> doesn't have an offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-14 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15577022#comment-15577022
 ] 

Cody Koeninger commented on SPARK-17812:


Assign is useful, otherwise you have no way of consuming only particular 
partitions of a topic.

Yeah, I just ended up using jackson tree model directly, as you said the 
catalyst stuff isn't really applicable.

Branch with initial implementation is is at 
https://github.com/koeninger/spark-1/tree/SPARK-17812 , will send a PR once I 
have some tests... trying to figure out if there's a reasonable way of unit 
testing offset out of range, but may just give up on that if it seems flaky.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: seems like it should be *Fail* or 
*Earliest*, based on failOnDataLoss.  but it looks like this setting is 
currently ignored, and the executor will just fail...
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  ?
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because 
we're doing explicit seeks to the specified position
#* Offset out of range on executor:  ?

I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during qu

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: seems like it should be*Fail* or 
*Earliest*, based on failOnDataLoss.  but it looks like this setting is 
currently ignored, and the executor will just fail...
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  ?
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because 
we're doing explicit seeks to the specified position
#* Offset out of range on executor:  ?

I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during que

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate 
this into offset too small and offset too large, but I'm not sure it matters 
for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because 
we're doing explicit seeks to the specified position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in th

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because 
we're doing explicit seeks to the specified position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from new parititon 
during query, because partitions may have changed in between pre-query 
configuration and query start, but we treat it differently, and users in this 
case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss.
# During query
#* New partition:  Earliest, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  Fail or Earliest, based on failOnDataLoss
# At query restart 
#* New partition: Checkpoint, fall back to Earliest.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  Fail or Earliest, based on FailOnDataLoss
#* Offset out of range on executor:  Fail or Earliest, based on FailOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from new parititon 
during query, because partitions may have changed in between pre-query 
configuration and query start, but we treat it differently, and users in this 
case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on exe

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR User specified json per topicpartition 
 (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  *Fail* or *Earliest*, based on failOnDataLoss
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# *New partition* is discovered
# *Offset out of range* (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR User specified json per topicpartition 
 (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want t

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per 
topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
*Earliest* above)  In general, I see no reason this couldn't specify Latest as 
an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  *Fail* or *Earliest*, based on failOnDataLoss
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR User specified json per topicpartition 
 (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# *New partition* is discovered
# *Offset out of range* (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR User specified json per topicpartition 
 (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  *Fail* or *Earliest*, based on failOnDataLoss
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# *New partition* is discovered
# *Offset out of range* (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at st

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# *New partition* is discovered
# *Offset out of range* (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently 
unsupported.  I think the semantics of this are super unclear by comparison 
with timestamp, given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
startingOffsets is *User specified* perTopicpartition, and the new partition 
isn't in the map, *Fail*.  Note that this is effectively undistinguishable from 
new parititon during query, because partitions may have changed in between 
pre-query configuration and query start, but we treat it differently, and users 
in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on 
failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  *Fail* or *Earliest*, based on failOnDataLoss
#* Offset out of range on executor:  *Fail* or *Earliest*, based on 
failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from new parititon 
during query, because partitions may have changed in between pre-query 
configuration and query start, but we treat it differently, and users in this 
case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
ea

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from new parititon 
during query, because partitions may have changed in between pre-query 
configuration and query start, but we treat it differently, and users in this 
case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss.
# During query
#* New partition:  Earliest, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this ??probably?? doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  Fail or Earliest, based on failOnDataLoss
# At query restart 
#* New partition: Checkpoint, fall back to Earliest.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  Fail or Earliest, based on FailOnDataLoss
#* Offset out of range on executor:  Fail or Earliest, based on FailOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from 2a below, because 
partitions may have changed in between pre-query configuration and query start, 
but we treat it differently, and users in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: Fail or Ea

[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17937:
---
Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# Earliest position in log
# Latest position in log
# Fail and kill the query
# Checkpoint position
# User specified per topicpartition
# Kafka commit log.  Currently unsupported.  This means users who want to 
migrate from existing kafka jobs need to jump through hoops.  Even if we never 
want to support it, as soon as we take on SPARK-17815 we need to make sure 
Kafka commit log state is clearly documented and handled.
# Timestamp.  Currently unsupported.  This could be supported with old, 
inaccurate Kafka time api, or upcoming time index
# X offsets before or after latest / earliest position.  Currently unsupported. 
 I think the semantics of this are super unclear by comparison with timestamp, 
given that Kafka doesn't have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: earliest OR latest OR json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies Fail above) OR false (which implies 
Earliest above)  In general, I see no reason this couldn't specify Latest as an 
option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is earliest or latest, use that.  If 
startingOffsets is perTopicpartition, and the new partition isn't in the map, 
Fail.  Note that this is effectively undistinguishable from 2a below, because 
partitions may have changed in between pre-query configuration and query start, 
but we treat it differently, and users in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this 
case yet.  Could use the value of failOnDataLoss, but it's possible people may 
want to know at startup that something was wrong, even if they're ok with 
earliest for a during-query out of range
#* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss.
# During query
#* New partition:  Earliest, only.  This seems to be by fiat, I see no reason 
this can't be configurable.
#* Offset out of range on driver:  this ??probably?? doesn't happen, because 
we're doing explicit seeks to the latest position
#* Offset out of range on executor:  Fail or Earliest, based on failOnDataLoss
# At query restart 
#* New partition: Checkpoint, fall back to Earliest.  Again, no reason this 
couldn't be configurable fall back to Latest
#* Offset out of range on driver:  Fail or Earliest, based on FailOnDataLoss
#* Offset out of range on executor:  Fail or Earliest, based on FailOnDataLoss


I've probably missed something, chime in.


> Clarify Kafka offset semantics for Structured Streaming
> ---
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
>  Issue Type: Sub-task
>Reporter: Cody Koeninger
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost)
> Possible sources of offsets:
> # Earliest position in log
> # Latest position in log
> # Fail and kill the query
> # Checkpoint position
> # User specified per topicpartition
> # Kafka commit log.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # Timestamp.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # X offsets before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: earliest OR latest OR json per topicpartition  
> (SPARK-17812)
> # failOnDataLoss: true (which implies Fail above) OR false (which implies 
> Earliest above)  In general, I see no reason this couldn't specify Latest as 
> an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is earliest or latest, use that.  If 
> startingOffsets is perTopicpartition, and the new partition isn't in the map, 
> Fail.  Note that this is effectively undistinguishable from 2a below, because 
> partitions may have changed in between pre-query configuration and query 
> start, but we treat it diff

[jira] [Created] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming

2016-10-14 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-17937:
--

 Summary: Clarify Kafka offset semantics for Structured Streaming
 Key: SPARK-17937
 URL: https://issues.apache.org/jira/browse/SPARK-17937
 Project: Spark
  Issue Type: Sub-task
Reporter: Cody Koeninger






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573843#comment-15573843
 ] 

Cody Koeninger commented on SPARK-17812:


So I think this is what we're agreed on:

Mutually exclusive subscription options (only assign is new to this ticket)
{noformat}
.option("subscribe","topicFoo,topicBar")
.option("subscribePattern","topic.*")
.option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
{noformat}

where assign can only be specified that way, no inline offsets

Single starting position option with three mutually exclusive types of value
{noformat}
.option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 1234, 
"1": -2}, "topicBar":{"0": -1}}""")
{noformat}

startingOffsets with json fails if any topicpartition in the assignments 
doesn't have an offset.

Sound right?

I'll go ahead and start on it.  I'm assuming I should try to reuse some of the 
existing catalyst Jackson stuff and keep in mind a format that's potentially 
usable by the checkpoints as well?

I don't think earliest / latest is too unclear as long as there's a way to get 
to the other knobs that auto.offset.reset (should have) provided. Punting the 
tunability of new partitions to another ticket sounds good.  


> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17813) Maximum data per trigger

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573806#comment-15573806
 ] 

Cody Koeninger commented on SPARK-17813:


So issues to be worked out here (assuming we're still ignoring compacted topics)

maxOffsetsPerTrigger - how are these maximums distributed among partitions?  
What about skewed topics / partitions?

maxOffsetsPerTopicPartitionPerTrigger - (this isn't just hypothetical, e.g. 
SPARK-17510) If we do this, how is this configuration communicated? 

{noformat}
option("maxOffsetsPerTopicPartitionPerTrigger", """{"topicFoo":{"0":600}, 
"topicBar":{"0":300, "1": 600}}""")
{noformat}

{noformat}
option("maxOffsetsPerTopicPerTrigger", """{"topicFoo": 600, "topicBar": 300}""")
{noformat}



> Maximum data per trigger
> 
>
> Key: SPARK-17813
> URL: https://issues.apache.org/jira/browse/SPARK-17813
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> At any given point in a streaming query execution, we process all available 
> data.  This maximizes throughput at the cost of latency.  We should add 
> something similar to the {{maxFilesPerTrigger}} option available for files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573766#comment-15573766
 ] 

Cody Koeninger commented on SPARK-17812:


OK, failing on start is clear (it's really annoying in the case of 
subscribePattern), but at least it's clear.  I think that's enough to get 
started on this ticket, is anyone currently working on it or can I do it?  Ryan 
seemed worried that it wouldn't get done in time for the next release.

It sounds like your current plan is to ignore whatever comes out of KAFKA-3370, 
which is fine as long as whatever you do is both clear and equally tunable.  
Clarity of semantics can't be the only criterion of an API, "You can only start 
at latest offset, period" is clear, but a crap api.

{quote}
the only case where we lack sufficient tunability is "Where do I go when the 
current offsets are invalid due to retention?".
{quote}

No, you lack sufficient tunability as to where newly discovered partitions 
start.  Keep in mind that those partitions may have been discovered after a 
significant job downtime.  If the point of an API is to provide clear semantics 
to the user, it is not at all clear to me as a user how I can start those 
partitions at latest, which I know is possible in the underlying data model.

The reason I'm belaboring this point now is that you have chosen names 
(earliest, latest) for the API currently under discussion that are confusingly 
similar to the existing auto offset reset functionality, and you have provided 
knobs for some, but not all, of the things auto offset reset currently affects. 
 This is going to confuse people, it already confuses me.



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573563#comment-15573563
 ] 

Cody Koeninger commented on SPARK-17812:


So a short term question
- with your proposed interface, what, as a user, do you expect to happen when 
you specify startingOffsets for some but not all partitions?

A couple of medium term questions:
- Yes, auto.offset.reset is a mess.  Have you read 
https://issues.apache.org/jira/browse/KAFKA-3370
- What are you going to do when that ticket is resolved?  It should allow users 
to answer the questions you raised in very specific ways, that your interface 
does not.

And a really leading long term question:
- Is the purpose of your interface to do what you think users should be able to 
do, or what they need to be able to do?

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573479#comment-15573479
 ] 

Cody Koeninger commented on SPARK-17812:


While some decision is better than none, can you help me understand why you 
don't believe me that auto.offset.reset is orthogonal to specifying specific 
starting positions?  Or do you just not believe it's important?

The reasons you guys used a different name from auto.offset.reset are that the 
Kafka project semantics of it are inadequate. But they will fix it, and when 
they do, the fact that you have conflated two unrelated things into one 
configuration in your api is going to cause problems.



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573432#comment-15573432
 ] 

Cody Koeninger edited comment on SPARK-17812 at 10/13/16 10:44 PM:
---

If you're seriously worried that people are going to get confused,

{noformat}
.option("defaultOffsets", "earliest" | "latest")
.option("specificOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""")
{noformat}

let those two at least not be mutually exclusive, and punt on the question of 
precedence until there's an actual startingTime or startingX ticket.




was (Author: c...@koeninger.org):
If you're seriously worried that people are going to get confused,

{noformat}
.option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""")
.option("defaultOffsets", "earliest" | "latest")
{noformat}

let those two at least not be mutually exclusive, and punt on the question of 
precedence until there's an actual startingTime or startingX ticket.



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573432#comment-15573432
 ] 

Cody Koeninger commented on SPARK-17812:


If you're seriously worried that people are going to get confused,

{noformat}
.option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""")
.option("defaultOffsets", "earliest" | "latest")
{noformat}

let those two at least not be mutually exclusive, and punt on the question of 
precedence until there's an actual startingTime or startingX ticket.



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573395#comment-15573395
 ] 

Cody Koeninger edited comment on SPARK-17812 at 10/13/16 10:25 PM:
---

1. we dont have lists, we have strings.  regexes and valid topic names have 
overlaps (dot is the obvious one).

2. Mapping directly to kafka method names means we don't have to come up with 
some other (weird and possibly overlapping) name when they add more ways to 
subscribe, we just use theirs.

3. I think this "starting X mssages" is a mess with kafka semantics for the 
reasons both you and I have already expressed.  At any rate, I think Michael 
already clearly punted the "starting X messages" case to a different ticket.

4. I  think it's more than sufficiently clear as suggested, no one is going to 
expect that a specific offset they provided is going to be overruled by a 
general single default.   The implementation is also crystal clear - seek to 
the position identified by startingTime, then seek to any specific offsets for 
specific partitions

Yes, this is all bikeshedding, but it's bikeshedding that directly affects what 
people are actually able to do with the api.  Needlessly restricting it for 
reasons that have nothing to do with safety is just going to piss users off for 
no reason. Just because you don't have a use case that needs it, doesn't mean 
you should arbitrarily prevent users from doing it.

Please, just choose something and let me build it so that people can actually 
use the thing by the next release


was (Author: c...@koeninger.org):
1. we dont have lists, we have strings.  regexes and valid topic names have 
overlaps (dot is the obvious one).

2. Mapping directly to kafka method names means we don't have to come up with 
some other (weird and possibly overlapping) name when they add more ways to 
subscribe, we just use theirs.

3. I think this is a mess with kafka semantics for the reasons both you and I 
have already expressed.  At any rate, I think Michael already clearly punted 
the "starting X" case to a different topic.

4. I  think it's more than sufficiently clear as suggested, no one is going to 
expect that a specific offset they provided is going to be overruled by a 
general single default.   The implementation is also crystal clear - seek to 
the position identified by startingTime, then seek to any specific offsets for 
specific partitions

Yes, this is all bikeshedding, but it's bikeshedding that directly affects what 
people are actually able to do with the api.  Needlessly restricting it for 
reasons that have nothing to do with safety is just going to piss users off for 
no reason. Just because you don't have a use case that needs it, doesn't mean 
you should arbitrarily prevent users from doing it.

Please, just choose something and let me build it so that people can actually 
use the thing by the next release

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573395#comment-15573395
 ] 

Cody Koeninger commented on SPARK-17812:


1. we dont have lists, we have strings.  regexes and valid topic names have 
overlaps (dot is the obvious one).

2. Mapping directly to kafka method names means we don't have to come up with 
some other (weird and possibly overlapping) name when they add more ways to 
subscribe, we just use theirs.

3. I think this is a mess with kafka semantics for the reasons both you and I 
have already expressed.  At any rate, I think Michael already clearly punted 
the "starting X" case to a different topic.

4. I  think it's more than sufficiently clear as suggested, no one is going to 
expect that a specific offset they provided is going to be overruled by a 
general single default.   The implementation is also crystal clear - seek to 
the position identified by startingTime, then seek to any specific offsets for 
specific partitions

Yes, this is all bikeshedding, but it's bikeshedding that directly affects what 
people are actually able to do with the api.  Needlessly restricting it for 
reasons that have nothing to do with safety is just going to piss users off for 
no reason. Just because you don't have a use case that needs it, doesn't mean 
you should arbitrarily prevent users from doing it.

Please, just choose something and let me build it so that people can actually 
use the thing by the next release

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17812:
---
Comment: was deleted

(was: One other slightly ugly thing...

{noformat}
// starting topicpartitions, no explicit offset
.option("assign", """{"topicfoo": [0, 1],"topicbar": [0, 1]}"""

// do you allow specifying with explicit offsets in the same config option? 
// or force it all into startingOffsetForRealzYo?
.option("assign", """{ "topicfoo" :{ "0": 1234, "1": 4567 }, "topicbar" : { 
"0": 1234, "1": 4567 }}""")
{noformat})

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573166#comment-15573166
 ] 

Cody Koeninger edited comment on SPARK-17812 at 10/13/16 9:17 PM:
--

Here's my concrete suggestion:

3 mutually exclusive ways of subscribing:

{noformat}
.option("subscribe","topicFoo,topicBar")
.option("subscribePattern","topic.*")
.option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
{noformat}

where assign can only be specified that way, no inline offsets

2 non-mutually exclusive ways of specifying starting position, explicit 
startingOffsets obviously take priority:

{noformat}
.option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""")
.option("startingTime", "earliest" | "latest" | long)
{noformat}
where long is a timestamp, work to be done on that later.
Note that even kafka 0.8 has a (really crappy based on log file modification 
time) api for time so later pursuing timestamps startingTime doesn't 
necessarily exclude it




was (Author: c...@koeninger.org):
Here's my concrete suggestion:

3 mutually exclusive ways of subscribing:

{noformat}
.option("subscribe","topicFoo,topicBar")
.option("subscribePattern","topic.*")
.option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
{noformat}

where assign can only be specified that way, no inline offsets

2 non-mutually exclusive ways of specifying starting position, explicit 
startingOffsets obviously take priority:

{noformat}
.option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}""")
.option("startingTime", "earliest" | "latest" | long)
{noformat}
where long is a timestamp, work to be done on that later.
Note that even kafka 0.8 has a (really crappy based on log file modification 
time) api for time so later pursuing timestamps startingTime doesn't 
necessarily exclude it



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573166#comment-15573166
 ] 

Cody Koeninger commented on SPARK-17812:


Here's my concrete suggestion:

3 mutually exclusive ways of subscribing:

{noformat}
.option("subscribe","topicFoo,topicBar")
.option("subscribePattern","topic.*")
.option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
{noformat}

where assign can only be specified that way, no inline offsets

2 non-mutually exclusive ways of specifying starting position, explicit 
startingOffsets obviously take priority:

{noformat}
.option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}""")
.option("startingTime", "earliest" | "latest" | long)
{noformat}
where long is a timestamp, work to be done on that later.
Note that even kafka 0.8 has a (really crappy based on log file modification 
time) api for time so later pursuing timestamps startingTime doesn't 
necessarily exclude it



> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572922#comment-15572922
 ] 

Cody Koeninger edited comment on SPARK-17812 at 10/13/16 8:33 PM:
--

Sorry, I didn't see this comment until just now.

X offsets back per partition is not a reasonable proxy for time when you're 
dealing with a stream that has multiple topics in it.  Agree we should break 
that out, focus on defining starting offsets in this ticket.

The concern with startingOffsets naming is that, because auto.offset.reset is 
orthogonal to specifying some offsets, you have a situation like this:

{noformat}
.format("kafka")
.option("subscribePattern", "topic.*")
.option("startingOffset", "latest")
.option("startingOffsetForRealzYo", """ { "topicfoo" : { "0": 1234, "1": 4567 
}, "topicbar" : { "0": 1234, "1": 4567 }}""")
{noformat}

where startingOffsetForRealzYo has a more reasonable name that conveys it is 
specifying starting offsets, yet is not confusingly similar to startingOffset

Trying to hack it all into one json as an alternative, with a "default" topic, 
means you're going to have to pick a key that isn't a valid topic, or add yet 
another layer of indirection.  It also makes it harder to make the format 
consistent with SPARK-17829 (which seems like a good thing to keep consistent, 
I agree)

Obviously I think you should just change the name, but it's your show.






was (Author: c...@koeninger.org):
Sorry, I didn't see this comment until just now.

X offsets back per partition is not a reasonable proxy for time when you're 
dealing with a stream that has multiple topics in it.  Agree we should break 
that out, focus on defining starting offsets in this ticket.

The concern with startingOffsets naming is that, because auto.offset.reset is 
orthogonal to specifying some offsets, you have a situation like this:

.format("kafka")
.option("subscribePattern", "topic.*")
.option("startingOffset", "latest")
.option("startingOffsetForRealzYo", """ { "topicfoo" : { "0": 1234, "1": 4567 
}, "topicbar" : { "0": 1234, "1": 4567 }}""")

where startingOffsetForRealzYo has a more reasonable name that conveys it is 
specifying starting offsets, yet is not confusingly similar to startingOffset

Trying to hack it all into one json as an alternative, with a "default" topic, 
means you're going to have to pick a key that isn't a valid topic, or add yet 
another layer of indirection.  It also makes it harder to make the format 
consistent with SPARK-17829 (which seems like a good thing to keep consistent, 
I agree)

Obviously I think you should just change the name, but it's your show.





> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573089#comment-15573089
 ] 

Cody Koeninger commented on SPARK-17812:


One other slightly ugly thing...

{noformat}
// starting topicpartitions, no explicit offset
.option("assign", """{"topicfoo": [0, 1],"topicbar": [0, 1]}"""

// do you allow specifying with explicit offsets in the same config option? 
// or force it all into startingOffsetForRealzYo?
.option("assign", """{ "topicfoo" :{ "0": 1234, "1": 4567 }, "topicbar" : { 
"0": 1234, "1": 4567 }}""")
{noformat}

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572922#comment-15572922
 ] 

Cody Koeninger commented on SPARK-17812:


Sorry, I didn't see this comment until just now.

X offsets back per partition is not a reasonable proxy for time when you're 
dealing with a stream that has multiple topics in it.  Agree we should break 
that out, focus on defining starting offsets in this ticket.

The concern with startingOffsets naming is that, because auto.offset.reset is 
orthogonal to specifying some offsets, you have a situation like this:

.format("kafka")
.option("subscribePattern", "topic.*")
.option("startingOffset", "latest")
.option("startingOffsetForRealzYo", """ { "topicfoo" : { "0": 1234, "1": 4567 
}, "topicbar" : { "0": 1234, "1": 4567 }}""")

where startingOffsetForRealzYo has a more reasonable name that conveys it is 
specifying starting offsets, yet is not confusingly similar to startingOffset

Trying to hack it all into one json as an alternative, with a "default" topic, 
means you're going to have to pick a key that isn't a valid topic, or add yet 
another layer of indirection.  It also makes it harder to make the format 
consistent with SPARK-17829 (which seems like a good thing to keep consistent, 
I agree)

Obviously I think you should just change the name, but it's your show.





> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17900) Mark the following Spark SQL APIs as stable

2016-10-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572119#comment-15572119
 ] 

Cody Koeninger commented on SPARK-17900:


Thanks for doing this, should make things clearer.  Those divisions make sense 
to me.

> Mark the following Spark SQL APIs as stable
> ---
>
> Key: SPARK-17900
> URL: https://issues.apache.org/jira/browse/SPARK-17900
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>
> Mark the following stable:
> Dataset/DataFrame
> - functions, since 1.3
> - ColumnName, since 1.3
> - DataFrameNaFunctions, since 1.3.1
> - DataFrameStatFunctions, since 1.4
> - UserDefinedFunction, since 1.3
> - UserDefinedAggregateFunction, since 1.5
> - Window and WindowSpec, since 1.4
> Data sources:
> - DataSourceRegister, since 1.5
> - RelationProvider, since 1.3
> - SchemaRelationProvider, since 1.3
> - CreatableRelationProvider, since 1.3
> - BaseRelation, since 1.3
> - TableScan, since 1.3
> - PrunedScan, since 1.3
> - PrunedFilteredScan, since 1.3
> - InsertableRelation, since 1.3
> Keep the following experimental / evolving:
> Data sources:
> - CatalystScan (tied to internal logical plans so it is not stable by 
> definition)
> Structured streaming:
> - all classes (introduced new in 2.0 and will likely change)
> Dataset typed operations (introduced in 1.6 and 2.0 and might change, 
> although probability is low)
> - all typed methods on Dataset
> - KeyValueGroupedDataset
> - o.a.s.sql.expressions.javalang.typed
> - o.a.s.sql.expressions.scalalang.typed
> - methods that return typed Dataset in SparkSession



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-15408) Spark streaming app crashes with NotLeaderForPartitionException

2016-10-12 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger closed SPARK-15408.
--
Resolution: Cannot Reproduce

> Spark streaming app crashes with NotLeaderForPartitionException 
> 
>
> Key: SPARK-15408
> URL: https://issues.apache.org/jira/browse/SPARK-15408
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.0
> Environment: Ubuntu 64 bit
>Reporter: Johny Mathew
>Priority: Critical
>
> We have a spark streaming application reading from kafka (with Kafka Direct 
> API) and it crashed with the exception shown in the next paragraph. We have a 
> 5 node kafka cluster with 19 partitions  (replication factor 3). Even though 
> the the spark application crashed the other kafka consumer apps were running 
> fine. Only one of the 5 kafka node was not working correctly (it did not go 
> down)
> /opt/hadoop/bin/yarn application -status application_1463151451543_0007
> 16/05/13 20:09:56 INFO client.RMProxy: Connecting to ResourceManager at 
> /172.16.130.189:8050
> Application Report :
>   Application-Id : application_1463151451543_0007
>   Application-Name : com.ibm.alchemy.eventgen.EventGenMetrics
>   Application-Type : SPARK
>   User : stack
>   Queue : default
>   Start-Time : 1463155034571
>   Finish-Time : 1463155310520
>   Progress : 100%
>   State : FINISHED
>   Final-State : FAILED
>   Tracking-URL : N/A
>   RPC Port : 0
>   AM Host : 172.16.130.188
>   Aggregate Resource Allocation : 9562329 MB-seconds, 2393 vcore-seconds
>   Diagnostics : User class threw exception: 
> org.apache.spark.SparkException: 
> ArrayBuffer(kafka.common.NotLeaderForPartitionException, 
> kafka.common.NotLeaderForPartitionException, 
> kafka.common.NotLeaderForPartitionException, 
> kafka.common.NotLeaderForPartitionException, 
> kafka.common.NotLeaderForPartitionException, 
> kafka.common.NotLeaderForPartitionException, 
> kafka.common.NotLeaderForPartitionException, 
> kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: 
> Couldn't find leader offsets for Set([alchemy-metrics,17], 
> [alchemy-metrics,10], [alchemy-metrics,3], [alchemy-metrics,4], 
> [alchemy-metrics,9], [alchemy-metrics,15], [alchemy-metrics,18], 
> [alchemy-metrics,5]))
> We cleared checkpoint and started the application but it crashed again. Then 
> at the end we found out the misbehaving kafka node and restarted it which 
> fixed the problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15272) DirectKafkaInputDStream doesn't work with window operation

2016-10-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570221#comment-15570221
 ] 

Cody Koeninger commented on SPARK-15272:


Checking to see if the 0.10 consumer's handling of preferred locations 
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies
 addresses this.

> DirectKafkaInputDStream doesn't work with window operation
> --
>
> Key: SPARK-15272
> URL: https://issues.apache.org/jira/browse/SPARK-15272
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2
>Reporter: Lubomir Nerad
>
> Using Kafka direct {{DStream}} with simple window operation like:
> {code:java}
> kafkaDStream.window(Durations.milliseconds(1),
> Durations.milliseconds(1000));
> .print();
> {code}
> with 1s batch duration either freezes after several seconds or lags terribly 
> (depending on cluster mode).
> This happens when Kafka brokers are not part of the Spark cluster (they are 
> on different nodes). The {{KafkaRDD}} still reports them as preferred 
> locations. This doesn't seem to be problem in non-window scenarios but with 
> window it conflicts with delay scheduling algorithm implemented in 
> {{TaskSetManager}}. It either significantly delays (Yarn mode) or completely 
> drains (Spark mode) resource offers with {{TaskLocality.ANY}} which are 
> needed to process tasks with these Kafka broker aligned preferred locations. 
> When delay scheduling algorithm is switched off ({{spark.locality.wait=0}}), 
> the example works correctly.
> I think that the {{KafkaRDD}} shouldn't report preferred locations if the 
> brokers don't correspond to worker nodes or allow the reporting of preferred 
> locations to be switched off. Also it would be good if delay scheduling 
> algorithm didn't drain / delay offers in the case, the tasks have unmatched 
> preferred locations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15272) DirectKafkaInputDStream doesn't work with window operation

2016-10-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570221#comment-15570221
 ] 

Cody Koeninger edited comment on SPARK-15272 at 10/12/16 11:33 PM:
---

Does the 0.10 consumer's handling of preferred locations 
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies
 address this for you?


was (Author: c...@koeninger.org):
Checking to see if the 0.10 consumer's handling of preferred locations 
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies
 addresses this.

> DirectKafkaInputDStream doesn't work with window operation
> --
>
> Key: SPARK-15272
> URL: https://issues.apache.org/jira/browse/SPARK-15272
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.2
>Reporter: Lubomir Nerad
>
> Using Kafka direct {{DStream}} with simple window operation like:
> {code:java}
> kafkaDStream.window(Durations.milliseconds(1),
> Durations.milliseconds(1000));
> .print();
> {code}
> with 1s batch duration either freezes after several seconds or lags terribly 
> (depending on cluster mode).
> This happens when Kafka brokers are not part of the Spark cluster (they are 
> on different nodes). The {{KafkaRDD}} still reports them as preferred 
> locations. This doesn't seem to be problem in non-window scenarios but with 
> window it conflicts with delay scheduling algorithm implemented in 
> {{TaskSetManager}}. It either significantly delays (Yarn mode) or completely 
> drains (Spark mode) resource offers with {{TaskLocality.ANY}} which are 
> needed to process tasks with these Kafka broker aligned preferred locations. 
> When delay scheduling algorithm is switched off ({{spark.locality.wait=0}}), 
> the example works correctly.
> I think that the {{KafkaRDD}} shouldn't report preferred locations if the 
> brokers don't correspond to worker nodes or allow the reporting of preferred 
> locations to be switched off. Also it would be good if delay scheduling 
> algorithm didn't drain / delay offers in the case, the tasks have unmatched 
> preferred locations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11698) Add option to ignore kafka messages that are out of limit rate

2016-10-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570208#comment-15570208
 ] 

Cody Koeninger commented on SPARK-11698:


Would a custom ConsumerStrategy for the new consumer added in SPARK-12177 allow 
you to address this issue?  You could supply a Consumer implementation that 
overrides poll

> Add option to ignore kafka messages that are out of limit rate
> --
>
> Key: SPARK-11698
> URL: https://issues.apache.org/jira/browse/SPARK-11698
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Liang-Chi Hsieh
>
> With spark.streaming.kafka.maxRatePerPartition, we can control the max rate 
> limit. However, we can not ignore these messages out of limit. These messages 
> will be consumed in next iteration. We have a use case that we need to ignore 
> these messages and process latest messages in next iteration.
> In other words, we simply want to consume part of messages in each iteration 
> and ignore remaining messages that are not consumed.
> We add an option for this purpose.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context

2016-10-12 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-10320.

   Resolution: Fixed
Fix Version/s: 2.0.0

SPARK-12177  added the new consumer, which supports SubscribePattern

> Kafka Support new topic subscriptions without requiring restart of the 
> streaming context
> 
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
> Fix For: 2.0.0
>
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-9947) Separate Metadata and State Checkpoint Data

2016-10-12 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger closed SPARK-9947.
-
Resolution: Won't Fix

The direct DStream api already gives access to offsets, and it seems clear that 
 most future work on streaming checkpointing is going to be focused on 
structured streaming. SPARK-15406

> Separate Metadata and State Checkpoint Data
> ---
>
> Key: SPARK-9947
> URL: https://issues.apache.org/jira/browse/SPARK-9947
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Problem: When updating an application that has checkpointing enabled to 
> support the updateStateByKey and 24/7 operation functionality, you encounter 
> the problem where you might like to maintain state data between restarts but 
> delete the metadata containing execution state. 
> If checkpoint data exists between code redeployment, the program may not 
> execute properly or at all. My current workaround for this issue is to wrap 
> updateStateByKey with my own function that persists the state after every 
> update to my own separate directory. (That allows me to delete the checkpoint 
> with its metadata before redeploying) Then, when I restart the application, I 
> initialize the state with this persisted data. This incurs additional 
> overhead due to persisting of the same data twice: once in the checkpoint and 
> once in my persisted data folder. 
> If Kafka Direct API offsets could be stored in another separate checkpoint 
> directory, that would help address the problem of having to blow that away 
> between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2016-10-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570186#comment-15570186
 ] 

Cody Koeninger commented on SPARK-8337:
---

Can this be closed, given that the subtasks are resolved and any future 
discussion of python dstream kafka support seems to be in SPARK-16534

> KafkaUtils.createDirectStream for python is lacking API/feature parity with 
> the Scala/Java version
> --
>
> Key: SPARK-8337
> URL: https://issues.apache.org/jira/browse/SPARK-8337
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Streaming
>Affects Versions: 1.4.0
>Reporter: Amit Ramesh
>Priority: Critical
>
> See the following thread for context.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-5505) ConsumerRebalanceFailedException from Kafka consumer

2016-10-12 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger closed SPARK-5505.
-
Resolution: Won't Fix

The old kafka High Level Consumer has been abandoned at this point.  
SPARK-12177 and SPARK-15406 use the new consumer api.

> ConsumerRebalanceFailedException from Kafka consumer
> 
>
> Key: SPARK-5505
> URL: https://issues.apache.org/jira/browse/SPARK-5505
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0
> Environment: CentOS6 / Linux 2.6.32-358.2.1.el6.x86_64
> java version "1.7.0_21"
> Scala compiler version 2.9.3
> 2 cores Intel(R) Xeon(R) CPU E5620  @ 2.40GHz / 16G RAM
> VMWare VM.
>Reporter: Greg Temchenko
>Priority: Critical
>
> From time to time Spark streaming produces a ConsumerRebalanceFailedException 
> and stops receiving messages. After that all consequential RDDs are empty.
> {code}
> 15/01/30 18:18:36 ERROR consumer.ZookeeperConsumerConnector: 
> [terran_vmname-1422670149779-243b4e10], error during syncedRebalance
> kafka.common.ConsumerRebalanceFailedException: 
> terran_vmname-1422670149779-243b4e10 can't rebalance after 4 retries
>   at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
>   at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
> {code}
> The problem is also described in the mailing list: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-td19570.html
> As I understand it's a critical blocker for kafka-spark streaming production 
> use.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-5718) Add native offset management for ReliableKafkaReceiver

2016-10-12 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-5718.
---
   Resolution: Fixed
Fix Version/s: 2.0.0

SPARK-12177 added support for the native kafka offset commit api

> Add native offset management for ReliableKafkaReceiver
> --
>
> Key: SPARK-5718
> URL: https://issues.apache.org/jira/browse/SPARK-5718
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Saisai Shao
> Fix For: 2.0.0
>
>
> Kafka 0.8.2 supports native offsets management instead of ZK, this will get 
> better performance, for now in ReliableKafkaReceiver, we rely on ZK to manage 
> the offsets, this potentially will be a bottleneck if the injection rate is 
> high (once per 200ms by default), so here in order to get better performance 
> as well as keeping consistent with Kafka, add native offset management for 
> ReliableKafkaReceiver.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10815) API design: data sources and sinks

2016-10-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570139#comment-15570139
 ] 

Cody Koeninger commented on SPARK-10815:


Another unfortunate thing about the Sink api is that it only exposes batch ids, 
with no way that I'm aware of to get at (e.g. Kafka) offsets.

Access to offsets for sinks that can take advantage of it would be preferable, 
as it's better for disaster recovery and doesn't lock you in to a particular 
streaming engine.

> API design: data sources and sinks
> --
>
> Key: SPARK-10815
> URL: https://issues.apache.org/jira/browse/SPARK-10815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL, Streaming
>Reporter: Reynold Xin
>
> The existing (in 2.0) source/sink interface for structured streaming depends 
> on RDDs. This dependency has two issues:
> 1. The RDD interface is wide and difficult to stabilize across versions. This 
> is similar to point 1 in https://issues.apache.org/jira/browse/SPARK-15689. 
> Ideally, a source/sink implementation created for Spark 2.x should work in 
> Spark 10.x, assuming the JVM is still around.
> 2. It is difficult to swap in/out a different execution engine.
> The purpose of this ticket is to create a stable interface that addresses the 
> above two.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2016-10-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15567457#comment-15567457
 ] 

Cody Koeninger commented on SPARK-17344:


Given the choice between rewriting underlying kafka consumers and having a 
split codebase, I'd rather have a split codebase.  Of course I'd rather not 
sink development effort into an old version of kafka at all, until the 
structured stream for 0.10 is working for my use cases.

But If you want to wrap the 0.8 rdd in a structured stream, go for it, I'll 
help you figure out how do it.  Seriously.  Don't expect larger project uptake, 
but if you just need something to work for you

> Kafka 0.8 support for Structured Streaming
> --
>
> Key: SPARK-17344
> URL: https://issues.apache.org/jira/browse/SPARK-17344
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Frederick Reiss
>
> Design and implement Kafka 0.8-based sources and sinks for Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-17837) Disaster recovery of offsets from WAL

2016-10-11 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger closed SPARK-17837.
--
Resolution: Duplicate

Duplicate of SPARK-17829

> Disaster recovery of offsets from WAL
> -
>
> Key: SPARK-17837
> URL: https://issues.apache.org/jira/browse/SPARK-17837
> Project: Spark
>  Issue Type: Sub-task
>Reporter: Cody Koeninger
>
> "The SQL offsets are stored in a WAL at $checkpointLocation/offsets/$batchId. 
> As reynold suggests though, we should change this to use a less opaque 
> format."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2016-10-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15566244#comment-15566244
 ] 

Cody Koeninger commented on SPARK-17344:


How long would it take CDH to distribute 0.10 if there was a compelling Spark 
client for it?

How are you going to handle SSL?

You can't avoid the complexity of caching consumers if you still want the 
benefits of prefetching, and doing an SSL handshake for every batch will kill 
performance if they aren't cached.

Also note that this is a pretty prime example of what I'm talking about in my 
dev mailing list discussion on SIPs.  This issue has been brought up, and 
decided against continuing support of 0.8, multiple times.

You guys started making promises about structured streaming for Kafka over half 
a year ago, and still don't have it feature complete.  This is a big potential 
detour for uncertain gain.  The real underlying problem is still how you're 
going to do better than simply wrapping a DStream, and I don't see how this is 
directly relevant.

> Kafka 0.8 support for Structured Streaming
> --
>
> Key: SPARK-17344
> URL: https://issues.apache.org/jira/browse/SPARK-17344
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Frederick Reiss
>
> Design and implement Kafka 0.8-based sources and sinks for Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17853) Kafka OffsetOutOfRangeException on DStreams union from separate Kafka clusters with identical topic names.

2016-10-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565559#comment-15565559
 ] 

Cody Koeninger commented on SPARK-17853:


Good, will keep this ticket open at least until documentation is made
clearer.

On Oct 11, 2016 8:48 AM, "Aleksander Ihnatowicz (JIRA)" 



> Kafka OffsetOutOfRangeException on DStreams union from separate Kafka 
> clusters with identical topic names.
> --
>
> Key: SPARK-17853
> URL: https://issues.apache.org/jira/browse/SPARK-17853
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Marcin Kuthan
>
> During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException  
> reported by Kafka client. In our scenario we create single DStream as a union 
> of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). 
> Both Kafka clusters have the same topics and number of partitions.
> After quick investigation, I found that class DirectKafkaInputDStream keeps 
> offset state for topic and partitions, but it is not aware of different Kafka 
> clusters. 
> For every topic, single DStream is created as a union from all configured 
> Kafka clusters.
> {code}
> class KafkaDStreamSource(configs: Iterable[Map[String, String]]) {
> def createSource(ssc: StreamingContext, topic: String): DStream[(String, 
> Array[Byte])] = {
> val streams = configs.map { config =>
>   val kafkaParams = config
>   val kafkaTopics = Set(topic)
>   KafkaUtils.
>   createDirectStream[String, Array[Byte]](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, 
> kafkaParams)
>   ).map { record =>
> (record.key, record.value)
>   }
> }
> ssc.union(streams.toSeq)
>   }
> }
> {code}
> At the end, offsets from one Kafka cluster overwrite offsets from second one. 
> Fortunately OffsetOutOfRangeException was thrown because offsets in both 
> Kafka clusters are significantly different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17853) Kafka OffsetOutOfRangeException on DStreams union from separate Kafka clusters with identical topic names.

2016-10-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565400#comment-15565400
 ] 

Cody Koeninger commented on SPARK-17853:


Use a different group id.

Let me know if that addresses the issue.




> Kafka OffsetOutOfRangeException on DStreams union from separate Kafka 
> clusters with identical topic names.
> --
>
> Key: SPARK-17853
> URL: https://issues.apache.org/jira/browse/SPARK-17853
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Marcin Kuthan
>
> During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException  
> reported by Kafka client. In our scenario we create single DStream as a union 
> of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). 
> Both Kafka clusters have the same topics and number of partitions.
> After quick investigation, I found that class DirectKafkaInputDStream keeps 
> offset state for topic and partitions, but it is not aware of different Kafka 
> clusters. 
> For every topic, single DStream is created as a union from all configured 
> Kafka clusters.
> {code}
> class KafkaDStreamSource(configs: Iterable[Map[String, String]]) {
> def createSource(ssc: StreamingContext, topic: String): DStream[(String, 
> Array[Byte])] = {
> val streams = configs.map { config =>
>   val kafkaParams = config
>   val kafkaTopics = Set(topic)
>   KafkaUtils.
>   createDirectStream[String, Array[Byte]](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, 
> kafkaParams)
>   ).map { record =>
> (record.key, record.value)
>   }
> }
> ssc.union(streams.toSeq)
>   }
> }
> {code}
> At the end, offsets from one Kafka cluster overwrite offsets from second one. 
> Fortunately OffsetOutOfRangeException was thrown because offsets in both 
> Kafka clusters are significantly different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17853) Kafka OffsetOutOfRangeException on DStreams union from separate Kafka clusters with identical topic names.

2016-10-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565278#comment-15565278
 ] 

Cody Koeninger commented on SPARK-17853:


Which version of DStream are you using, 0-10 or 0-8?
Are you using the same group id for both streams?

> Kafka OffsetOutOfRangeException on DStreams union from separate Kafka 
> clusters with identical topic names.
> --
>
> Key: SPARK-17853
> URL: https://issues.apache.org/jira/browse/SPARK-17853
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Marcin Kuthan
>
> During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException  
> reported by Kafka client. In our scenario we create single DStream as a union 
> of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). 
> Both Kafka clusters have the same topics and number of partitions.
> After quick investigation, I found that class DirectKafkaInputDStream keeps 
> offset state for topic and partitions, but it is not aware of different Kafka 
> clusters. 
> For every topic, single DStream is created as a union from all configured 
> Kafka clusters.
> {code}
> class KafkaDStreamSource(configs: Iterable[Map[String, String]]) {
> def createSource(ssc: StreamingContext, topic: String): DStream[(String, 
> Array[Byte])] = {
> val streams = configs.map { config =>
>   val kafkaParams = config
>   val kafkaTopics = Set(topic)
>   KafkaUtils.
>   createDirectStream[String, Array[Byte]](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, 
> kafkaParams)
>   ).map { record =>
> (record.key, record.value)
>   }
> }
> ssc.union(streams.toSeq)
>   }
> }
> {code}
> At the end, offsets from one Kafka cluster overwrite offsets from second one. 
> Fortunately OffsetOutOfRangeException was thrown because offsets in both 
> Kafka clusters are significantly different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15563285#comment-15563285
 ] 

Cody Koeninger commented on SPARK-17812:


No, it's not covered by strict assign.  If you don't have this, you're 
basically saying you can never have well-defined starting offsets for a job 
that starts as SubscribePattern.

The existing DStream already does this, because it doesn't conflate 
auto.offset.reset with user-specified offsets.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560925#comment-15560925
 ] 

Cody Koeninger commented on SPARK-17812:


I want to start a pattern subscription at known good offsets for a
particular topic, and latest available for other topics that match.




> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560839#comment-15560839
 ] 

Cody Koeninger commented on SPARK-17812:


That totally kills the usability of SubscribePattern.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560837#comment-15560837
 ] 

Cody Koeninger commented on SPARK-17815:


My personal concerns about complexity are because I'm the one who answers 
nearly every email on the spark user list regarding the Kafka integration.

My business need relative to this ticket is that any Kafka integration I use 
must give me a way to transactionally store offsets and results.  If those 
downstream offsets aren't used as at least a source of truth, then that goal 
isn't met.  If this ticket is implicitly saying WAL is the only source of 
truth, that's a problem for me.

SIP is what it is or may not be, we're where we are now having the discussion 
we're having now, and I'll leave it at that ;) 

> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560719#comment-15560719
 ] 

Cody Koeninger commented on SPARK-17812:


Generally agree with the direction of what you're saying, but the question of 
where to start for partitions that don't have offsets specified is orthogonal 
to the question of where/how to specify offsets for particular partitions.  
It's not just 4 options.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560684#comment-15560684
 ] 

Cody Koeninger commented on SPARK-17815:


Regarding kafka consumer behavior, I'm not saying it's impossible, I'm
saying it A. Needs attention, especially since what's in master isn't
complete and B. May confuse users who expect the offsets in Kafka to
determine where the consumer starts.

Regarding wal, DStream checkpoints were in HDFS too. Being on HDFS doesn't
guarantee against getting screwed up.
And in any case, the boundary alignment of multiple different offset stores
still applies




> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17841) Kafka 0.10 commitQueue needs to be drained

2016-10-09 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-17841:
--

 Summary: Kafka 0.10 commitQueue needs to be drained
 Key: SPARK-17841
 URL: https://issues.apache.org/jira/browse/SPARK-17841
 Project: Spark
  Issue Type: Bug
Reporter: Cody Koeninger


Current implementation is just iterating, not polling and removing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560241#comment-15560241
 ] 

Cody Koeninger commented on SPARK-17147:


[~graphex] My WIP is at https://github.com/koeninger/spark-1/tree/SPARK-17147  
Still really rough, but it's passing the basic test I set up.  Let me know at 
what point you have time for trying it out & we can iterate.

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets
> 
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15559911#comment-15559911
 ] 

Cody Koeninger commented on SPARK-17815:


The WAL cannot be the only source of truth, because it can be corrupted in a 
situation where the downstream results and offsets are not.  The downstream 
offsets by contrast cant be corrupted without also affecting the results, thats 
the whole point of transactions.  Even if you do ignore the fact that the wal 
can be corrupted, you still have to be careful about aligning boundaries of the 
wal with boundaries of the downstream store.
The kafka commit log cant be ignored as merely for metric collection either. A 
kafka consumer is going to use it in preference to auto.offset.reset as the 
starting point for a newly constructed consumer.
I'm not saying these issues are unsolvable, but you cant just handwave them 
away, and they are confusing to end users. There was already confusion with 
only 2 stores - ZK and the dstream checkpoint.

> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets

2016-10-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558572#comment-15558572
 ] 

Cody Koeninger commented on SPARK-17147:


I talked with Sean in person about this, and think there's a way to move 
forward.  I'll start hacking on it.

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets
> 
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4960) Interceptor pattern in receivers

2016-10-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558557#comment-15558557
 ] 

Cody Koeninger commented on SPARK-4960:
---

Is this idea pretty much dead at this point? It seems like most attention has 
moved off of receiver-based dstream.

> Interceptor pattern in receivers
> 
>
> Key: SPARK-4960
> URL: https://issues.apache.org/jira/browse/SPARK-4960
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Tathagata Das
>
> Sometimes it is good to intercept a message received through a receiver and 
> modify / do something with the message before it is stored into Spark. This 
> is often referred to as the interceptor pattern. There should be general way 
> to specify an interceptor function that gets applied to all receivers. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM

2016-10-08 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-3146.
---
   Resolution: Fixed
Fix Version/s: 1.3.0

> Improve the flexibility of Spark Streaming Kafka API to offer user the 
> ability to process message before storing into BM
> 
>
> Key: SPARK-3146
> URL: https://issues.apache.org/jira/browse/SPARK-3146
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Saisai Shao
> Fix For: 1.3.0
>
>
> Currently Spark Streaming Kafka API stores the key and value of each message 
> into BM for processing, potentially this may lose the flexibility for 
> different requirements:
> 1. currently topic/partition/offset information for each message is discarded 
> by KafkaInputDStream. In some scenarios people may need this information to 
> better filter the message, like SPARK-2388 described.
> 2. People may need to add timestamp for each message when feeding into Spark 
> Streaming, which can better measure the system latency.
> 3. Checkpointing the partition/offsets or others...
> So here we add a messageHandler in interface to give people the flexibility 
> to preprocess the message before storing into BM. In the meantime time this 
> improvement keep compatible with current API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM

2016-10-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558550#comment-15558550
 ] 

Cody Koeninger commented on SPARK-3146:
---

SPARK-4964 / the direct stream added a messageHandler.


> Improve the flexibility of Spark Streaming Kafka API to offer user the 
> ability to process message before storing into BM
> 
>
> Key: SPARK-3146
> URL: https://issues.apache.org/jira/browse/SPARK-3146
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Saisai Shao
>
> Currently Spark Streaming Kafka API stores the key and value of each message 
> into BM for processing, potentially this may lose the flexibility for 
> different requirements:
> 1. currently topic/partition/offset information for each message is discarded 
> by KafkaInputDStream. In some scenarios people may need this information to 
> better filter the message, like SPARK-2388 described.
> 2. People may need to add timestamp for each message when feeding into Spark 
> Streaming, which can better measure the system latency.
> 3. Checkpointing the partition/offsets or others...
> So here we add a messageHandler in interface to give people the flexibility 
> to preprocess the message before storing into BM. In the meantime time this 
> improvement keep compatible with current API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17837) Disaster recovery of offsets from WAL

2016-10-08 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-17837:
---
Summary: Disaster recovery of offsets from WAL  (was: Disaster recover of 
offsets from WAL)

> Disaster recovery of offsets from WAL
> -
>
> Key: SPARK-17837
> URL: https://issues.apache.org/jira/browse/SPARK-17837
> Project: Spark
>  Issue Type: Sub-task
>Reporter: Cody Koeninger
>
> "The SQL offsets are stored in a WAL at $checkpointLocation/offsets/$batchId. 
> As reynold suggests though, we should change this to use a less opaque 
> format."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17837) Disaster recover of offsets from WAL

2016-10-08 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-17837:
--

 Summary: Disaster recover of offsets from WAL
 Key: SPARK-17837
 URL: https://issues.apache.org/jira/browse/SPARK-17837
 Project: Spark
  Issue Type: Sub-task
Reporter: Cody Koeninger


"The SQL offsets are stored in a WAL at $checkpointLocation/offsets/$batchId. 
As reynold suggests though, we should change this to use a less opaque format."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558528#comment-15558528
 ] 

Cody Koeninger commented on SPARK-17815:


So if you start committing offsets to kafka, there are going to be potentially 
three places offsets are stored:

1.  structured WAL
2. kafka commit topic
3. downstream store

It's going to be easy to get confused as to what the source of truth is.


> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558506#comment-15558506
 ] 

Cody Koeninger commented on SPARK-17812:


So I'm willing to do this work, mostly because I've already done it, but there 
are some user interface issues here that need to get figured out.

You already chose the name "startingOffset" for specifying the equivalent of 
auto.offset.reset.  Now we're looking at actually adding starting offsets.  
Furthermore, it should be possible to specify starting offsets for some 
partitions, while relying on the equivalent of auto.offset.reset for other 
unspecified ones (the existing DStream does this).

What are you expecting configuration of this to look like?  I can see a couple 
of options:

1. Try to cram everything into startingOffset with some horrible string-based 
DSL
2. Have a separate option for specifying starting offsets for real, with a name 
that makes it clear what it is, yet doesn't use "startingoffset".  As for the 
value, I guess in json form of some kind?   { "topicfoo" : { "0": 1234, "1": 
4567 }}

Somewhat related is that Assign needs a way of specifying topicpartitions.

As far as the idea to seek back X offsets, I think it'd be better to look at 
offset time indexing.
If you are going to do the X offsets back idea, the offsets -1L and -2L already 
have special meaning, so it's going to be kind of confusing to allow negative 
numbers in an interface that is specifying offsets.


> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2016-10-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558474#comment-15558474
 ] 

Cody Koeninger commented on SPARK-17344:


I think this is premature until you have a fully operational battlestation, er, 
structured stream, that has all the necessary features for 0.10

Regarding the conversation with Michael about possibly using the kafka protocol 
directly as a way to work around the differences between 0.8 and 0.10, please 
don't consider that.  Every kafka consumer implementation I've ever used has 
bugs, and we don't need to spend time writing another buggy one.  

By contrast, writing a streaming source shim around the existing simple 
consumer-based 0.8 spark rdd would be a weekend project, it just wouldn't have 
stuff like SSL, dynamic topics, or offset committing.

> Kafka 0.8 support for Structured Streaming
> --
>
> Key: SPARK-17344
> URL: https://issues.apache.org/jira/browse/SPARK-17344
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Frederick Reiss
>
> Design and implement Kafka 0.8-based sources and sinks for Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-06 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554160#comment-15554160
 ] 

Cody Koeninger commented on SPARK-15406:


I think if you're already gravitating towards json as the lowest common 
denominator for configuration interface, it makes sense to use that for the log.

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-06 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554139#comment-15554139
 ] 

Cody Koeninger commented on SPARK-15406:


When something has gone wrong, as an end user, how do I get Kafka offsets out 
of a structured streaming checkpoint?  Honest question, is there a way to do 
it, I did look but probably not sufficiently thoroughly.

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-06 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553955#comment-15553955
 ] 

Cody Koeninger edited comment on SPARK-15406 at 10/7/16 3:20 AM:
-

As soon as you say "checkpoint", a whole class of people who were burned by the 
problems with DStream checkpoints are going to tune out.  I don't trust 
anything unless I can get my hands on the offsets.

Regarding exactly once, this has two components, producing idempotently to 
kafka, and storing idempotently/transactionally downstream.  Specifically 
regarding producing to Kafka, I was talking with Jay Kreps about this 
yesterday.  There have been a lot of ideas over the years (e.g. 
put-with-expected-offset).  The top contender at this point is apparently 
something along the lines of a tcp/ip sequence number on the producer side 
allowing for repeated writes to be ignored on the broker side.  The KIP doc for 
it should be coming "soon".  Specifically regarding downstream write, without 
an actual implementation for real downstream systems and no way to get offsets 
except for individual messages, what exists currently couldn't even be tested 
in production at my current gig.  I could probably pretty quickly write a jdbc 
sink and a Citus sink, but those seem like they would require significant 
knowledge and / or assumptions about the job.  Having a schema helps a lot, but 
do you still want to assume things like a particular table format for outputs 
and offsets?  That those sinks only work with Kafka?


was (Author: c...@koeninger.org):
As soon as you say "checkpoint", a whole class of people who were burned by the 
problems with DStream checkpoints are going to tune out.  I don't trust 
anything unless I can get my hands on the offsets.

Regarding exactly once, this has two components, producing idempotently to 
kafka, and storing idempotently/transactionally downstream.  Specifically 
regarding producing to Kafka, I was talking with Jay Kreps about this 
yesterday.  There have been a lot of ideas over the years (e.g. 
put-with-expected-offset).  The top contender at this point is apparently 
something along the lines of a tcp/ip sequence number on the producer side 
allowing for repeated writes to be ignored on the broker side.  The KIP doc for 
it should be coming "soon".  Specifically regarding downstream write, without 
an actual implementation for real downstream systems and no way to get offsets 
except for individual messages, what exists currently couldn't even be tested 
in production at my current gig.  I could probably pretty quickly write a jdbc 
sink and a Citus sink, but those seem like they would require significant 
knowledge and / or assumptions about the job.  Having a schema helps a lot, but 
do you still want to assume things like a particular table format for outputs?  
That those sinks only work with Kafka?

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   >