[jira] [Comment Edited] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source
[ https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15679472#comment-15679472 ] Cody Koeninger edited comment on SPARK-18475 at 11/19/16 4:02 PM: -- Yes, an RDD does have an ordering guarantee, it's an iterator per partition, same as Kafka. Yes, that guarantee is part of the Kafka data model (Burak, if you don't believe me, go reread http://kafka.apache.org/documentation.html#introduction search for "order"). Because the direct stream (and the structured stream that uses the same model) has a 1:1 correspondence between kafka partition and spark partition, that guarantee is preserved. The existing distortions between the Kafka model and the direct stream / structured stream are enough as it is, we don't need to add more. was (Author: c...@koeninger.org): Yes, an RDD does have an ordering guarantee, it's an iterator per partition, same as Kafka. Yes, that guarantee is part of the Kafka data model (Burak, if you don't believe me, go reread http://kafka.apache.org/documentation.html#introduction search for "order"). Because the direct stream (and the structured stream that uses the same model) has a 1:! correspondence between kafka partition and spark partition, that guarantee is preserved. The existing distortions between the Kafka model and the direct stream / structured stream are enough as it is, we don't need to add more. > Be able to provide higher parallelization for StructuredStreaming Kafka Source > -- > > Key: SPARK-18475 > URL: https://issues.apache.org/jira/browse/SPARK-18475 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz > > Right now the StructuredStreaming Kafka Source creates as many Spark tasks as > there are TopicPartitions that we're going to read from Kafka. > This doesn't work well when we have data skew, and there is no reason why we > shouldn't be able to increase parallelism further, i.e. have multiple Spark > tasks reading from the same Kafka TopicPartition. > What this will mean is that we won't be able to use the "CachedKafkaConsumer" > for what it is defined for (being cached) in this use case, but the extra > overhead is worth handling data skew and increasing parallelism especially in > ETL use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source
[ https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15679472#comment-15679472 ] Cody Koeninger commented on SPARK-18475: Yes, an RDD does have an ordering guarantee, it's an iterator per partition, same as Kafka. Yes, that guarantee is part of the Kafka data model (Burak, if you don't believe me, go reread http://kafka.apache.org/documentation.html#introduction search for "order"). Because the direct stream (and the structured stream that uses the same model) has a 1:! correspondence between kafka partition and spark partition, that guarantee is preserved. The existing distortions between the Kafka model and the direct stream / structured stream are enough as it is, we don't need to add more. > Be able to provide higher parallelization for StructuredStreaming Kafka Source > -- > > Key: SPARK-18475 > URL: https://issues.apache.org/jira/browse/SPARK-18475 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz > > Right now the StructuredStreaming Kafka Source creates as many Spark tasks as > there are TopicPartitions that we're going to read from Kafka. > This doesn't work well when we have data skew, and there is no reason why we > shouldn't be able to increase parallelism further, i.e. have multiple Spark > tasks reading from the same Kafka TopicPartition. > What this will mean is that we won't be able to use the "CachedKafkaConsumer" > for what it is defined for (being cached) in this use case, but the extra > overhead is worth handling data skew and increasing parallelism especially in > ETL use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source
[ https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674459#comment-15674459 ] Cody Koeninger commented on SPARK-18475: This has come up several times, and my answer is consistently the same - as Ofir said, the Kafka model is parallelism bounded by number of partitions. Breaking that model breaks user expectations, e.g. concerning ordering. It's fine for you if this helps your specific use case, but I think it is not appropriate for general use. I'd recommend people fix their skew and/or repartition at the producer level. > Be able to provide higher parallelization for StructuredStreaming Kafka Source > -- > > Key: SPARK-18475 > URL: https://issues.apache.org/jira/browse/SPARK-18475 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz > > Right now the StructuredStreaming Kafka Source creates as many Spark tasks as > there are TopicPartitions that we're going to read from Kafka. > This doesn't work well when we have data skew, and there is no reason why we > shouldn't be able to increase parallelism further, i.e. have multiple Spark > tasks reading from the same Kafka TopicPartition. > What this will mean is that we won't be able to use the "CachedKafkaConsumer" > for what it is defined for (being cached) in this use case, but the extra > overhead is worth handling data skew and increasing parallelism especially in > ETL use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18386) Batch mode SQL source for Kafka
[ https://issues.apache.org/jira/browse/SPARK-18386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654308#comment-15654308 ] Cody Koeninger commented on SPARK-18386: That should work. There may be dependency conflicts trying to put a 0.10.1 jar in the same job as a 0.10.0, though. > Batch mode SQL source for Kafka > --- > > Key: SPARK-18386 > URL: https://issues.apache.org/jira/browse/SPARK-18386 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Cody Koeninger > > An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for > querying over a defined batch of offsets. > The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X > to timestamp Y) should be taken into account, even if not available in the > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0
[ https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654295#comment-15654295 ] Cody Koeninger commented on SPARK-18057: I definitely do not want another copy-paste situation, we've already got too many of them. I'm hoping that 0.10.1 is close enough to 0.10.0 that dependency issues can be worked out in a more satisfactory way (e.g. kafka is marked as provided, the 0.10.1 integration jar depends on the 0.10 integration jar and just adds methods for time indexing) but I haven't really had time to look at it. > Update structured streaming kafka from 10.0.1 to 10.1.0 > --- > > Key: SPARK-18057 > URL: https://issues.apache.org/jira/browse/SPARK-18057 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18386) Batch mode SQL source for Kafka
Cody Koeninger created SPARK-18386: -- Summary: Batch mode SQL source for Kafka Key: SPARK-18386 URL: https://issues.apache.org/jira/browse/SPARK-18386 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cody Koeninger An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for querying over a defined batch of offsets. The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X to timestamp Y) should be taken into account, even if not available in the initial implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18371) Spark Streaming backpressure bug - generates a batch with large number of records
[ https://issues.apache.org/jira/browse/SPARK-18371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649638#comment-15649638 ] Cody Koeninger commented on SPARK-18371: Thanks for digging into this. The other thing I noticed when working on https://github.com/apache/spark/pull/15132 is that the return value of getLatestRate was cast to Int, which seems wrong and possibly subject to overflow. If you have the ability to test that PR (shouldn't require a spark redeploy, since the kafka jar is standalone), may want to test it out. > Spark Streaming backpressure bug - generates a batch with large number of > records > - > > Key: SPARK-18371 > URL: https://issues.apache.org/jira/browse/SPARK-18371 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0 >Reporter: mapreduced > Attachments: GiantBatch2.png, GiantBatch3.png, > Giant_batch_at_23_00.png, Look_at_batch_at_22_14.png > > > When the streaming job is configured with backpressureEnabled=true, it > generates a GIANT batch of records if the processing time + scheduled delay > is (much) larger than batchDuration. This creates a backlog of records like > no other and results in batches queueing for hours until it chews through > this giant batch. > Expectation is that it should reduce the number of records per batch in some > time to whatever it can really process. > Attaching some screen shots where it seems that this issue is quite easily > reproducible. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18258) Sinks need access to offset representation
[ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15638555#comment-15638555 ] Cody Koeninger commented on SPARK-18258: Sure, added, let me know if I'm missing something or can clarify. > Sinks need access to offset representation > -- > > Key: SPARK-18258 > URL: https://issues.apache.org/jira/browse/SPARK-18258 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > Transactional "exactly-once" semantics for output require storing an offset > identifier in the same transaction as results. > The Sink.addBatch method currently only has access to batchId and data, not > the actual offset representation. > I want to store the actual offsets, so that they are recoverable as long as > the results are and I'm not locked in to a particular streaming engine. > I could see this being accomplished by adding parameters to Sink.addBatch for > the starting and ending offsets (either the offsets themselves, or the > SPARK-17829 string/json representation). That would be an API change, but if > there's another way to map batch ids to offset representations without > changing the Sink api that would work as well. > I'm assuming we don't need the same level of access to offsets throughout a > job as e.g. the Kafka dstream gives, because Sinks are the main place that > should need them. > After SPARK-17829 is complete and offsets have a .json method, an api for > this ticket might look like > {code} > trait Sink { > def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: > OffsetSeq): Unit > {code} > where start and end were provided by StreamExecution.runBatch using > committedOffsets and availableOffsets. > I'm not 100% certain that the offsets in the seq could always be mapped back > to the correct source when restarting complicated multi-source jobs, but I > think it'd be sufficient. Passing the string/json representation of the seq > instead of the seq itself would probably be sufficient as well, but the > convention of rendering a None as "-" in the json is maybe a little > idiosyncratic to parse, and the constant defining that is private. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18258) Sinks need access to offset representation
[ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-18258: --- Description: Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results. The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation. I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine. I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation). That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well. I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them. After SPARK-17829 is complete and offsets have a .json method, an api for this ticket might look like {code} trait Sink { def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: OffsetSeq): Unit {code} where start and end were provided by StreamExecution.runBatch using committedOffsets and availableOffsets. I'm not 100% certain that the offsets in the seq could always be mapped back to the correct source when restarting complicated multi-source jobs, but I think it'd be sufficient. Passing the string/json representation of the seq instead of the seq itself would probably be sufficient as well, but the convention of rendering a None as "-" in the json is maybe a little idiosyncratic to parse, and the constant defining that is private. was: Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results. The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation. I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine. I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation). That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well. I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them. > Sinks need access to offset representation > -- > > Key: SPARK-18258 > URL: https://issues.apache.org/jira/browse/SPARK-18258 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > Transactional "exactly-once" semantics for output require storing an offset > identifier in the same transaction as results. > The Sink.addBatch method currently only has access to batchId and data, not > the actual offset representation. > I want to store the actual offsets, so that they are recoverable as long as > the results are and I'm not locked in to a particular streaming engine. > I could see this being accomplished by adding parameters to Sink.addBatch for > the starting and ending offsets (either the offsets themselves, or the > SPARK-17829 string/json representation). That would be an API change, but if > there's another way to map batch ids to offset representations without > changing the Sink api that would work as well. > I'm assuming we don't need the same level of access to offsets throughout a > job as e.g. the Kafka dstream gives, because Sinks are the main place that > should need them. > After SPARK-17829 is complete and offsets have a .json method, an api for > this ticket might look like > {code} > trait Sink { > def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: > OffsetSeq): Unit > {code} > where start and end were provided by StreamExecution.runBatch using > committedOffsets and availableOffsets. > I'm not 100% certain that the offsets in the seq could always be mapped back > to the correct source when restarting complicated multi-source jobs, but I > think it'd be sufficient. Passing the string/json representation of the seq > instead of the seq itself would probably be sufficient as well, but the > convention of rendering a None as "-" in the json is maybe a little > idiosyncratic to parse, and the constant defining that is private. -- This message was sent by Atlassian JIRA (v6.3.4#6332) ---
[jira] [Commented] (SPARK-18258) Sinks need access to offset representation
[ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15637621#comment-15637621 ] Cody Koeninger commented on SPARK-18258: So one obvious one is that if wherever checkpoint data is being stored fails or is corrupted, my downstream database can still be fine and have correct results, yet I have no way of restarting the job from a known point because the batch id stored in the database is now meaningless. Basically, I do not want to introduce another N points of failure in between Kafka and my data store. > Sinks need access to offset representation > -- > > Key: SPARK-18258 > URL: https://issues.apache.org/jira/browse/SPARK-18258 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > Transactional "exactly-once" semantics for output require storing an offset > identifier in the same transaction as results. > The Sink.addBatch method currently only has access to batchId and data, not > the actual offset representation. > I want to store the actual offsets, so that they are recoverable as long as > the results are and I'm not locked in to a particular streaming engine. > I could see this being accomplished by adding parameters to Sink.addBatch for > the starting and ending offsets (either the offsets themselves, or the > SPARK-17829 string/json representation). That would be an API change, but if > there's another way to map batch ids to offset representations without > changing the Sink api that would work as well. > I'm assuming we don't need the same level of access to offsets throughout a > job as e.g. the Kafka dstream gives, because Sinks are the main place that > should need them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18258) Sinks need access to offset representation
[ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15637576#comment-15637576 ] Cody Koeninger commented on SPARK-18258: The sink doesn't have to reason about equality of the representations. It just has to be able to store those representations, in addition the batch id if necessary, so that the job can be recovered if spark fails in a way that renders the batch id meaningless or the user wants to switch to a different streaming system. > Sinks need access to offset representation > -- > > Key: SPARK-18258 > URL: https://issues.apache.org/jira/browse/SPARK-18258 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > Transactional "exactly-once" semantics for output require storing an offset > identifier in the same transaction as results. > The Sink.addBatch method currently only has access to batchId and data, not > the actual offset representation. > I want to store the actual offsets, so that they are recoverable as long as > the results are and I'm not locked in to a particular streaming engine. > I could see this being accomplished by adding parameters to Sink.addBatch for > the starting and ending offsets (either the offsets themselves, or the > SPARK-17829 string/json representation). That would be an API change, but if > there's another way to map batch ids to offset representations without > changing the Sink api that would work as well. > I'm assuming we don't need the same level of access to offsets throughout a > job as e.g. the Kafka dstream gives, because Sinks are the main place that > should need them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18272) Test topic addition for subscribePattern on Kafka DStream and Structured Stream
Cody Koeninger created SPARK-18272: -- Summary: Test topic addition for subscribePattern on Kafka DStream and Structured Stream Key: SPARK-18272 URL: https://issues.apache.org/jira/browse/SPARK-18272 Project: Spark Issue Type: Bug Components: DStreams, Structured Streaming Reporter: Cody Koeninger We've had reports of the following sequence - create subscribePattern stream that doesn't match any existing topics at the time stream starts - add a topic that matches pattern - expect that messages from that topic show up, but they don't We don't seem to actually have tests that cover this case, so we should add them -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18258) Sinks need access to offset representation
[ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-18258: --- Description: Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results. The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation. I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine. I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation). That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well. I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them. was: Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results. The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation. I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine. I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation). That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well. I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them. > Sinks need access to offset representation > -- > > Key: SPARK-18258 > URL: https://issues.apache.org/jira/browse/SPARK-18258 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > Transactional "exactly-once" semantics for output require storing an offset > identifier in the same transaction as results. > The Sink.addBatch method currently only has access to batchId and data, not > the actual offset representation. > I want to store the actual offsets, so that they are recoverable as long as > the results are and I'm not locked in to a particular streaming engine. > I could see this being accomplished by adding parameters to Sink.addBatch for > the starting and ending offsets (either the offsets themselves, or the > SPARK-17829 string/json representation). That would be an API change, but if > there's another way to map batch ids to offset representations without > changing the Sink api that would work as well. > I'm assuming we don't need the same level of access to offsets throughout a > job as e.g. the Kafka dstream gives, because Sinks are the main place that > should need them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18258) Sinks need access to offset representation
Cody Koeninger created SPARK-18258: -- Summary: Sinks need access to offset representation Key: SPARK-18258 URL: https://issues.apache.org/jira/browse/SPARK-18258 Project: Spark Issue Type: Improvement Components: Structured Streaming Reporter: Cody Koeninger Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results. The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation. I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine. I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation). That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well. I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17938) Backpressure rate not adjusting
[ https://issues.apache.org/jira/browse/SPARK-17938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15629332#comment-15629332 ] Cody Koeninger commented on SPARK-17938: Direct stream isn't a receiver, receiver settings don't apply to it. > Backpressure rate not adjusting > --- > > Key: SPARK-17938 > URL: https://issues.apache.org/jira/browse/SPARK-17938 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0, 2.0.1 >Reporter: Samy Dindane > > spark-streaming 2.0.1 and spark-streaming-kafka-0-10 version is 2.0.1. Same > behavior with 2.0.0 though. > spark.streaming.kafka.consumer.poll.ms is set to 3 > spark.streaming.kafka.maxRatePerPartition is set to 10 > spark.streaming.backpressure.enabled is set to true > `batchDuration` of the streaming context is set to 1 second. > I consume a Kafka topic using KafkaUtils.createDirectStream(). > My system can handle 100k records batches, but it'd take more than 1 seconds > to process them all. I'd thus expect the backpressure to reduce the number of > records that would be fetched in the next batch to keep the processing delay > inferior to 1 second. > Only this does not happen and the rate of the backpressure stays the same: > stuck in `100.0`, no matter how the other variables change (processing time, > error, etc.). > Here's a log showing how all these variables change but the chosen rate stays > the same: https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba (I > would have attached a file but I don't see how). > Is this the expected behavior and I am missing something, or is this a bug? > I'll gladly help by providing more information or writing code if necessary. > Thank you. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18212) Flaky test: org.apache.spark.sql.kafka010.KafkaSourceSuite.assign from specific offsets
[ https://issues.apache.org/jira/browse/SPARK-18212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15627838#comment-15627838 ] Cody Koeninger commented on SPARK-18212: So here's a heavily excerpted version of what I see happening in that log: {code} 16/11/01 14:08:46.593 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO KafkaTestUtils: Sent 34 to partition 2, offset 3 16/11/01 14:08:46.593 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO KafkaProducer: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 16/11/01 14:08:46.596 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO KafkaTestUtils: Created consumer to get latest offsets 16/11/01 14:08:47.833 Executor task launch worker-2 ERROR Executor: Exception in task 1.0 in stage 29.0 (TID 142) java.lang.AssertionError: assertion failed: Failed to get records for spark-kafka-source-a9485cc4-c83d-4e97-a20e-3960565b3fdb-335403166-execut\ or topic-5-2 3 after polling for 512 16/11/01 14:08:49.252 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO KafkaTestUtils: Closed consumer to get latest offsets 16/11/01 14:08:49.252 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO KafkaSourceSuite: Added data, expected offset [(topic-5-0,4), (topic-5-1,4), (topic-5-2,4), (topic-5-3,4), (topic-5-4,4)] {code} We're waiting on the producer's send future for up to 10 seconds; it takes almost 3 seconds between when the producer send finishes and the consumer that's being used to verify the post-send offsets finishes; but in the meantime we're only waiting half a second for executor fetches. It's really ugly, but probably the easiest way to make this less flaky is to increase the value of kafkaConsumer.pollTimeoutMs to the same order of magnitude being used for the other test waits. [~zsxwing] unless you see anything else wrong in the log or have a better idea, I can put in a pr tomorrow to increase that poll timeout in tests. > Flaky test: org.apache.spark.sql.kafka010.KafkaSourceSuite.assign from > specific offsets > --- > > Key: SPARK-18212 > URL: https://issues.apache.org/jira/browse/SPARK-18212 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Reporter: Davies Liu > > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.3/1968/testReport/junit/org.apache.spark.sql.kafka010/KafkaSourceSuite/assign_from_specific_offsets/ -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
[ https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15626774#comment-15626774 ] Cody Koeninger commented on SPARK-17935: Some other things to think about: - are there any producer configurations you don't want to support? - specifically, are you only going to support byte array serializers for writing key and value? - if you're only supporting byte array, how do you clearly document for users how to handle their common use case (i.e. I have one string column I want to be the key, and the others columns should be json name/value pairs in the message) > Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module > -- > > Key: SPARK-17935 > URL: https://issues.apache.org/jira/browse/SPARK-17935 > Project: Spark > Issue Type: Improvement > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: zhangxinyu > > Now spark already supports kafkaInputStream. It would be useful that we add > `KafkaForeachWriter` to output results to kafka in structured streaming > module. > `KafkaForeachWriter.scala` is put in external kafka-0.8.0. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
[ https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15612663#comment-15612663 ] Cody Koeninger commented on SPARK-17935: So the main thing to point out is that Kafka producers currently aren't idempotent, so this sink can't be fault-tolerant. Regarding the design doc, couple of comments - KafkaSinkRDD Why is this necessary? Seems like KafkaSink should do basically the same as existing ForeachSink class - CachedKafkaProducer Why is this necessary? A singleton producer per JVM is generally what's recommended by kafka docs. > Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module > -- > > Key: SPARK-17935 > URL: https://issues.apache.org/jira/browse/SPARK-17935 > Project: Spark > Issue Type: Improvement > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: zhangxinyu > > Now spark already supports kafkaInputStream. It would be useful that we add > `KafkaForeachWriter` to output results to kafka in structured streaming > module. > `KafkaForeachWriter.scala` is put in external kafka-0.8.0. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17829) Stable format for offset log
[ https://issues.apache.org/jira/browse/SPARK-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15610399#comment-15610399 ] Cody Koeninger commented on SPARK-17829: I'm not telling you to do it that way, just asking if you had considered it. General advantage of typeclasses being separating concerns (should all these classes need to know about json) and getting inductive definitions for free (if you have a serializer for container, you have a serializer for any container of nested serializable). If all the stuff you're looking at modifying already knows about java serialization it may not be a big deal though. Specifically about the using a seq instead of array for compactible file stream, isn't there an existing warning in the code as to why that's using an array, due to pathological behavior on large linked lists? > Stable format for offset log > > > Key: SPARK-17829 > URL: https://issues.apache.org/jira/browse/SPARK-17829 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Tyson Condie > > Currently we use java serialization for the WAL that stores the offsets > contained in each batch. This has two main issues: > - It can break across spark releases (though this is not the only thing > preventing us from upgrading a running query) > - It is unnecessarily opaque to the user. > I'd propose we require offsets to provide a user readable serialization and > use that instead. JSON is probably a good option. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17829) Stable format for offset log
[ https://issues.apache.org/jira/browse/SPARK-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15609653#comment-15609653 ] Cody Koeninger commented on SPARK-17829: Have you considered using a typeclass? > Stable format for offset log > > > Key: SPARK-17829 > URL: https://issues.apache.org/jira/browse/SPARK-17829 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Tyson Condie > > Currently we use java serialization for the WAL that stores the offsets > contained in each batch. This has two main issues: > - It can break across spark releases (though this is not the only thing > preventing us from upgrading a running query) > - It is unnecessarily opaque to the user. > I'd propose we require offsets to provide a user readable serialization and > use that instead. JSON is probably a good option. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0
Cody Koeninger created SPARK-18057: -- Summary: Update structured streaming kafka from 10.0.1 to 10.1.0 Key: SPARK-18057 URL: https://issues.apache.org/jira/browse/SPARK-18057 Project: Spark Issue Type: Sub-task Reporter: Cody Koeninger There are a couple of relevant KIPs here, https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18056) Update KafkaDStreams from 10.0.1 to 10.1.0
[ https://issues.apache.org/jira/browse/SPARK-18056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-18056: --- Description: There are a couple of relevant KIPs here, https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html > Update KafkaDStreams from 10.0.1 to 10.1.0 > -- > > Key: SPARK-18056 > URL: https://issues.apache.org/jira/browse/SPARK-18056 > Project: Spark > Issue Type: Improvement >Reporter: Cody Koeninger > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18056) Update KafkaDStreams from 10.0.1 to 10.1.0
Cody Koeninger created SPARK-18056: -- Summary: Update KafkaDStreams from 10.0.1 to 10.1.0 Key: SPARK-18056 URL: https://issues.apache.org/jira/browse/SPARK-18056 Project: Spark Issue Type: Improvement Reporter: Cody Koeninger -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17829) Stable format for offset log
[ https://issues.apache.org/jira/browse/SPARK-17829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15593000#comment-15593000 ] Cody Koeninger commented on SPARK-17829: At least with regard to kafka offsets, it might be good to keep this the same format as in SPARK-17812 > Stable format for offset log > > > Key: SPARK-17829 > URL: https://issues.apache.org/jira/browse/SPARK-17829 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Tyson Condie > > Currently we use java serialization for the WAL that stores the offsets > contained in each batch. This has two main issues: > - It can break across spark releases (though this is not the only thing > preventing us from upgrading a running query) > - It is unnecessarily opaque to the user. > I'd propose we require offsets to provide a user readable serialization and > use that instead. JSON is probably a good option. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18033) Deprecate TaskContext.partitionId
Cody Koeninger created SPARK-18033: -- Summary: Deprecate TaskContext.partitionId Key: SPARK-18033 URL: https://issues.apache.org/jira/browse/SPARK-18033 Project: Spark Issue Type: Improvement Reporter: Cody Koeninger Mark TaskContext.partitionId as deprecated, because it doesn't always reflect the physical index at the time the RDD is created. Add a foreachPartitionWithIndex method to mirror the existing mapPartitionsWithIndex method. For background, see http://apache-spark-developers-list.1001551.n3.nabble.com/PSA-TaskContext-partitionId-the-actual-logical-partition-index-td19524.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586417#comment-15586417 ] Cody Koeninger commented on SPARK-17147: If that's something you're seeing regularly, probably worth bringing it up on the mailing list, with a full stacktrace and whatever background you have > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586397#comment-15586397 ] Cody Koeninger commented on SPARK-17147: Then no, this issue is unlikely to affect you unless there's something wrong with your topic. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17147: --- Summary: Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction) (was: Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets) > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584172#comment-15584172 ] Cody Koeninger commented on SPARK-17147: Well, are you using compacted topics? > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578894#comment-15578894 ] Cody Koeninger commented on SPARK-17812: As you just said yourself, assign doesn't mean you necessarily know the exact starting offsets you want. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions > currently agreed on plan: > Mutually exclusive subscription options (only assign is new to this ticket) > {noformat} > .option("subscribe","topicFoo,topicBar") > .option("subscribePattern","topic.*") > .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") > {noformat} > where assign can only be specified that way, no inline offsets > Single starting position option with three mutually exclusive types of value > {noformat} > .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": > 1234, "1": -2}, "topicBar":{"0": -1}}""") > {noformat} > startingOffsets with json fails if any topicpartition in the assignments > doesn't have an offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
[ https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578141#comment-15578141 ] Cody Koeninger commented on SPARK-17935: Why is this in kafka-0-8, when we haven't resolved (for the third time) whether we're even continuing to work on 0.8? Those modules have conflicting dependencies as is. > Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module > -- > > Key: SPARK-17935 > URL: https://issues.apache.org/jira/browse/SPARK-17935 > Project: Spark > Issue Type: Improvement > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: zhangxinyu > > Now spark already supports kafkaInputStream. It would be useful that we add > `KafkaForeachWriter` to output results to kafka in structured streaming > module. > `KafkaForeachWriter.scala` is put in external kafka-0.8.0. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17938) Backpressure rate not adjusting
[ https://issues.apache.org/jira/browse/SPARK-17938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578133#comment-15578133 ] Cody Koeninger commented on SPARK-17938: There was pretty extensive discussion of this on list, should link or summarize it. Couple of things here: 100 is the default minimum rate for pidestimator. If you're willing to write code, put more logging in to determine why that rate isn't being configured, or hardcode it to a different number. I have successfully adjusted that rate using spark configuration. The other thing is that if your system takes way longer than 1 second to process 100k records, 100k obviously isn't a reasonable max. Many large batches will be defined during the time that first batch is running, before back pressure is involved at all. Try a lower max. > Backpressure rate not adjusting > --- > > Key: SPARK-17938 > URL: https://issues.apache.org/jira/browse/SPARK-17938 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0, 2.0.1 >Reporter: Samy Dindane > > spark-streaming 2.0.1 and spark-streaming-kafka-0-10 version is 2.0.1. Same > behavior with 2.0.0 though. > spark.streaming.kafka.consumer.poll.ms is set to 3 > spark.streaming.kafka.maxRatePerPartition is set to 10 > spark.streaming.backpressure.enabled is set to true > `batchDuration` of the streaming context is set to 1 second. > I consume a Kafka topic using KafkaUtils.createDirectStream(). > My system can handle 100k records batches, but it'd take more than 1 seconds > to process them all. I'd thus expect the backpressure to reduce the number of > records that would be fetched in the next batch to keep the processing delay > inferior to 1 second. > Only this does not happen and the rate of the backpressure stays the same: > stuck in `100.0`, no matter how the other variables change (processing time, > error, etc.). > Here's a log showing how all these variables change but the chosen rate stays > the same: https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba (I > would have attached a file but I don't see how). > Is this the expected behavior and I am missing something, or is this a bug? > I'll gladly help by providing more information or writing code if necessary. > Thank you. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17813) Maximum data per trigger
[ https://issues.apache.org/jira/browse/SPARK-17813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15577037#comment-15577037 ] Cody Koeninger commented on SPARK-17813: To be clear, the current direct stream (and as a result structured stream) straight up will not work with compacted topics currently, because of the assumption that offset ranges are contiguous. There's a ticket for it SPARK-17147 with a prototype solution, waiting for feedback from a user on it. So for global maxOffsetsPerTrigger are you saying a spark configuration? Is there a reason not to make that a maxRowsPerTrigger (or messages, or whatever name) so that it can potentially be reused by other sources? I think for this a proportional distribution of offsets shouldn't be too hard. I can pick this up once the assign stuff is stabilized. > Maximum data per trigger > > > Key: SPARK-17813 > URL: https://issues.apache.org/jira/browse/SPARK-17813 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > At any given point in a streaming query execution, we process all available > data. This maximizes throughput at the cost of latency. We should add > something similar to the {{maxFilesPerTrigger}} option available for files. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17812: --- Description: Right now you can only run a Streaming Query starting from either the earliest or latests offsets available at the moment the query is started. Sometimes this is a lot of data. It would be nice to be able to do the following: - seek to user specified offsets for manually specified topicpartitions currently agreed on plan: Mutually exclusive subscription options (only assign is new to this ticket) {noformat} .option("subscribe","topicFoo,topicBar") .option("subscribePattern","topic.*") .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") {noformat} where assign can only be specified that way, no inline offsets Single starting position option with three mutually exclusive types of value {noformat} .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 1234, "1": -2}, "topicBar":{"0": -1}}""") {noformat} startingOffsets with json fails if any topicpartition in the assignments doesn't have an offset. was: Right now you can only run a Streaming Query starting from either the earliest or latests offsets available at the moment the query is started. Sometimes this is a lot of data. It would be nice to be able to do the following: - seek to user specified offsets for manually specified topicpartitions > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions > currently agreed on plan: > Mutually exclusive subscription options (only assign is new to this ticket) > {noformat} > .option("subscribe","topicFoo,topicBar") > .option("subscribePattern","topic.*") > .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") > {noformat} > where assign can only be specified that way, no inline offsets > Single starting position option with three mutually exclusive types of value > {noformat} > .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": > 1234, "1": -2}, "topicBar":{"0": -1}}""") > {noformat} > startingOffsets with json fails if any topicpartition in the assignments > doesn't have an offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15577022#comment-15577022 ] Cody Koeninger commented on SPARK-17812: Assign is useful, otherwise you have no way of consuming only particular partitions of a topic. Yeah, I just ended up using jackson tree model directly, as you said the catalyst stuff isn't really applicable. Branch with initial implementation is is at https://github.com/koeninger/spark-1/tree/SPARK-17812 , will send a PR once I have some tests... trying to figure out if there's a reasonable way of unit testing offset out of range, but may just give up on that if it seems flaky. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost). It's possible to separate this into offset too small and offset too large, but I'm not sure it matters for us. Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: seems like it should be *Fail* or *Earliest*, based on failOnDataLoss. but it looks like this setting is currently ignored, and the executor will just fail... # During query #* New partition: *Earliest*, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: ? # At query restart #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the specified position #* Offset out of range on executor: ? I've probably missed something, chime in. was: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost). It's possible to separate this into offset too small and offset too large, but I'm not sure it matters for us. Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during qu
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost). It's possible to separate this into offset too small and offset too large, but I'm not sure it matters for us. Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: seems like it should be*Fail* or *Earliest*, based on failOnDataLoss. but it looks like this setting is currently ignored, and the executor will just fail... # During query #* New partition: *Earliest*, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: ? # At query restart #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the specified position #* Offset out of range on executor: ? I've probably missed something, chime in. was: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost). It's possible to separate this into offset too small and offset too large, but I'm not sure it matters for us. Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during que
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost). It's possible to separate this into offset too small and offset too large, but I'm not sure it matters for us. Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss. # During query #* New partition: *Earliest*, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss # At query restart #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the specified position #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss I've probably missed something, chime in. was: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in th
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss. # During query #* New partition: *Earliest*, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss # At query restart #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the specified position #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss I've probably missed something, chime in. was: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # Earliest position in log # Latest position in log # Fail and kill the query # Checkpoint position # User specified per topicpartition # Kafka commit log. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # Timestamp. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # X offsets before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is earliest or latest, use that. If startingOffsets is perTopicpartition, and the new partition isn't in the map, Fail. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss. # During query #* New partition: Earliest, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss # At query restart #* New partition: Checkpoint, fall back to Earliest. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: Fail or Earliest, based on FailOnDataLoss #* Offset out of range on executor: Fail or Earliest, based on FailOnDataLoss I've probably missed something, chime in. was: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # Earliest position in log # Latest position in log # Fail and kill the query # Checkpoint position # User specified per topicpartition # Kafka commit log. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # Timestamp. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # X offsets before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is earliest or latest, use that. If startingOffsets is perTopicpartition, and the new partition isn't in the map, Fail. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on exe
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR User specified json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss. # During query #* New partition: *Earliest*, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss # At query restart #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: *Fail* or *Earliest*, based on failOnDataLoss #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss I've probably missed something, chime in. was: Possible events for which offsets are needed: # *New partition* is discovered # *Offset out of range* (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR User specified json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want t
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss. # During query #* New partition: *Earliest*, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss # At query restart #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: *Fail* or *Earliest*, based on failOnDataLoss #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss I've probably missed something, chime in. was: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR User specified json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # *New partition* is discovered # *Offset out of range* (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR User specified json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss. # During query #* New partition: *Earliest*, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss # At query restart #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: *Fail* or *Earliest*, based on failOnDataLoss #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss I've probably missed something, chime in. was: Possible events for which offsets are needed: # *New partition* is discovered # *Offset out of range* (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at st
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # *New partition* is discovered # *Offset out of range* (aka, data has been lost) Possible sources of offsets: # *Earliest* position in log # *Latest* position in log # *Fail* and kill the query # *Checkpoint* position # *User specified* per topicpartition # *Kafka commit log*. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # *Timestamp*. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # *X offsets* before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If startingOffsets is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss. # During query #* New partition: *Earliest*, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this _probably_ doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss # At query restart #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: *Fail* or *Earliest*, based on failOnDataLoss #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss I've probably missed something, chime in. was: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # Earliest position in log # Latest position in log # Fail and kill the query # Checkpoint position # User specified per topicpartition # Kafka commit log. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # Timestamp. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # X offsets before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is earliest or latest, use that. If startingOffsets is perTopicpartition, and the new partition isn't in the map, Fail. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with ea
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # Earliest position in log # Latest position in log # Fail and kill the query # Checkpoint position # User specified per topicpartition # Kafka commit log. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # Timestamp. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # X offsets before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is earliest or latest, use that. If startingOffsets is perTopicpartition, and the new partition isn't in the map, Fail. Note that this is effectively undistinguishable from new parititon during query, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss. # During query #* New partition: Earliest, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this ??probably?? doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss # At query restart #* New partition: Checkpoint, fall back to Earliest. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: Fail or Earliest, based on FailOnDataLoss #* Offset out of range on executor: Fail or Earliest, based on FailOnDataLoss I've probably missed something, chime in. was: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # Earliest position in log # Latest position in log # Fail and kill the query # Checkpoint position # User specified per topicpartition # Kafka commit log. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # Timestamp. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # X offsets before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is earliest or latest, use that. If startingOffsets is perTopicpartition, and the new partition isn't in the map, Fail. Note that this is effectively undistinguishable from 2a below, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: Fail or Ea
[jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17937: --- Description: Possible events for which offsets are needed: # New partition is discovered # Offset out of range (aka, data has been lost) Possible sources of offsets: # Earliest position in log # Latest position in log # Fail and kill the query # Checkpoint position # User specified per topicpartition # Kafka commit log. Currently unsupported. This means users who want to migrate from existing kafka jobs need to jump through hoops. Even if we never want to support it, as soon as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and handled. # Timestamp. Currently unsupported. This could be supported with old, inaccurate Kafka time api, or upcoming time index # X offsets before or after latest / earliest position. Currently unsupported. I think the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't have a single range of offsets. Currently allowed pre-query configuration, all "ORs" are exclusive: # startingOffsets: earliest OR latest OR json per topicpartition (SPARK-17812) # failOnDataLoss: true (which implies Fail above) OR false (which implies Earliest above) In general, I see no reason this couldn't specify Latest as an option. Possible lifecycle times in which an offset-related event may happen: # At initial query start #* New partition: if startingOffsets is earliest or latest, use that. If startingOffsets is perTopicpartition, and the new partition isn't in the map, Fail. Note that this is effectively undistinguishable from 2a below, because partitions may have changed in between pre-query configuration and query start, but we treat it differently, and users in this case are SOL #* Offset out of range on driver: We don't technically have behavior for this case yet. Could use the value of failOnDataLoss, but it's possible people may want to know at startup that something was wrong, even if they're ok with earliest for a during-query out of range #* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss. # During query #* New partition: Earliest, only. This seems to be by fiat, I see no reason this can't be configurable. #* Offset out of range on driver: this ??probably?? doesn't happen, because we're doing explicit seeks to the latest position #* Offset out of range on executor: Fail or Earliest, based on failOnDataLoss # At query restart #* New partition: Checkpoint, fall back to Earliest. Again, no reason this couldn't be configurable fall back to Latest #* Offset out of range on driver: Fail or Earliest, based on FailOnDataLoss #* Offset out of range on executor: Fail or Earliest, based on FailOnDataLoss I've probably missed something, chime in. > Clarify Kafka offset semantics for Structured Streaming > --- > > Key: SPARK-17937 > URL: https://issues.apache.org/jira/browse/SPARK-17937 > Project: Spark > Issue Type: Sub-task >Reporter: Cody Koeninger > > Possible events for which offsets are needed: > # New partition is discovered > # Offset out of range (aka, data has been lost) > Possible sources of offsets: > # Earliest position in log > # Latest position in log > # Fail and kill the query > # Checkpoint position > # User specified per topicpartition > # Kafka commit log. Currently unsupported. This means users who want to > migrate from existing kafka jobs need to jump through hoops. Even if we > never want to support it, as soon as we take on SPARK-17815 we need to make > sure Kafka commit log state is clearly documented and handled. > # Timestamp. Currently unsupported. This could be supported with old, > inaccurate Kafka time api, or upcoming time index > # X offsets before or after latest / earliest position. Currently > unsupported. I think the semantics of this are super unclear by comparison > with timestamp, given that Kafka doesn't have a single range of offsets. > Currently allowed pre-query configuration, all "ORs" are exclusive: > # startingOffsets: earliest OR latest OR json per topicpartition > (SPARK-17812) > # failOnDataLoss: true (which implies Fail above) OR false (which implies > Earliest above) In general, I see no reason this couldn't specify Latest as > an option. > Possible lifecycle times in which an offset-related event may happen: > # At initial query start > #* New partition: if startingOffsets is earliest or latest, use that. If > startingOffsets is perTopicpartition, and the new partition isn't in the map, > Fail. Note that this is effectively undistinguishable from 2a below, because > partitions may have changed in between pre-query configuration and query > start, but we treat it diff
[jira] [Created] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
Cody Koeninger created SPARK-17937: -- Summary: Clarify Kafka offset semantics for Structured Streaming Key: SPARK-17937 URL: https://issues.apache.org/jira/browse/SPARK-17937 Project: Spark Issue Type: Sub-task Reporter: Cody Koeninger -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573843#comment-15573843 ] Cody Koeninger commented on SPARK-17812: So I think this is what we're agreed on: Mutually exclusive subscription options (only assign is new to this ticket) {noformat} .option("subscribe","topicFoo,topicBar") .option("subscribePattern","topic.*") .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") {noformat} where assign can only be specified that way, no inline offsets Single starting position option with three mutually exclusive types of value {noformat} .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 1234, "1": -2}, "topicBar":{"0": -1}}""") {noformat} startingOffsets with json fails if any topicpartition in the assignments doesn't have an offset. Sound right? I'll go ahead and start on it. I'm assuming I should try to reuse some of the existing catalyst Jackson stuff and keep in mind a format that's potentially usable by the checkpoints as well? I don't think earliest / latest is too unclear as long as there's a way to get to the other knobs that auto.offset.reset (should have) provided. Punting the tunability of new partitions to another ticket sounds good. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17813) Maximum data per trigger
[ https://issues.apache.org/jira/browse/SPARK-17813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573806#comment-15573806 ] Cody Koeninger commented on SPARK-17813: So issues to be worked out here (assuming we're still ignoring compacted topics) maxOffsetsPerTrigger - how are these maximums distributed among partitions? What about skewed topics / partitions? maxOffsetsPerTopicPartitionPerTrigger - (this isn't just hypothetical, e.g. SPARK-17510) If we do this, how is this configuration communicated? {noformat} option("maxOffsetsPerTopicPartitionPerTrigger", """{"topicFoo":{"0":600}, "topicBar":{"0":300, "1": 600}}""") {noformat} {noformat} option("maxOffsetsPerTopicPerTrigger", """{"topicFoo": 600, "topicBar": 300}""") {noformat} > Maximum data per trigger > > > Key: SPARK-17813 > URL: https://issues.apache.org/jira/browse/SPARK-17813 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > At any given point in a streaming query execution, we process all available > data. This maximizes throughput at the cost of latency. We should add > something similar to the {{maxFilesPerTrigger}} option available for files. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573766#comment-15573766 ] Cody Koeninger commented on SPARK-17812: OK, failing on start is clear (it's really annoying in the case of subscribePattern), but at least it's clear. I think that's enough to get started on this ticket, is anyone currently working on it or can I do it? Ryan seemed worried that it wouldn't get done in time for the next release. It sounds like your current plan is to ignore whatever comes out of KAFKA-3370, which is fine as long as whatever you do is both clear and equally tunable. Clarity of semantics can't be the only criterion of an API, "You can only start at latest offset, period" is clear, but a crap api. {quote} the only case where we lack sufficient tunability is "Where do I go when the current offsets are invalid due to retention?". {quote} No, you lack sufficient tunability as to where newly discovered partitions start. Keep in mind that those partitions may have been discovered after a significant job downtime. If the point of an API is to provide clear semantics to the user, it is not at all clear to me as a user how I can start those partitions at latest, which I know is possible in the underlying data model. The reason I'm belaboring this point now is that you have chosen names (earliest, latest) for the API currently under discussion that are confusingly similar to the existing auto offset reset functionality, and you have provided knobs for some, but not all, of the things auto offset reset currently affects. This is going to confuse people, it already confuses me. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573563#comment-15573563 ] Cody Koeninger commented on SPARK-17812: So a short term question - with your proposed interface, what, as a user, do you expect to happen when you specify startingOffsets for some but not all partitions? A couple of medium term questions: - Yes, auto.offset.reset is a mess. Have you read https://issues.apache.org/jira/browse/KAFKA-3370 - What are you going to do when that ticket is resolved? It should allow users to answer the questions you raised in very specific ways, that your interface does not. And a really leading long term question: - Is the purpose of your interface to do what you think users should be able to do, or what they need to be able to do? > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573479#comment-15573479 ] Cody Koeninger commented on SPARK-17812: While some decision is better than none, can you help me understand why you don't believe me that auto.offset.reset is orthogonal to specifying specific starting positions? Or do you just not believe it's important? The reasons you guys used a different name from auto.offset.reset are that the Kafka project semantics of it are inadequate. But they will fix it, and when they do, the fact that you have conflated two unrelated things into one configuration in your api is going to cause problems. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573432#comment-15573432 ] Cody Koeninger edited comment on SPARK-17812 at 10/13/16 10:44 PM: --- If you're seriously worried that people are going to get confused, {noformat} .option("defaultOffsets", "earliest" | "latest") .option("specificOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""") {noformat} let those two at least not be mutually exclusive, and punt on the question of precedence until there's an actual startingTime or startingX ticket. was (Author: c...@koeninger.org): If you're seriously worried that people are going to get confused, {noformat} .option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""") .option("defaultOffsets", "earliest" | "latest") {noformat} let those two at least not be mutually exclusive, and punt on the question of precedence until there's an actual startingTime or startingX ticket. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573432#comment-15573432 ] Cody Koeninger commented on SPARK-17812: If you're seriously worried that people are going to get confused, {noformat} .option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""") .option("defaultOffsets", "earliest" | "latest") {noformat} let those two at least not be mutually exclusive, and punt on the question of precedence until there's an actual startingTime or startingX ticket. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573395#comment-15573395 ] Cody Koeninger edited comment on SPARK-17812 at 10/13/16 10:25 PM: --- 1. we dont have lists, we have strings. regexes and valid topic names have overlaps (dot is the obvious one). 2. Mapping directly to kafka method names means we don't have to come up with some other (weird and possibly overlapping) name when they add more ways to subscribe, we just use theirs. 3. I think this "starting X mssages" is a mess with kafka semantics for the reasons both you and I have already expressed. At any rate, I think Michael already clearly punted the "starting X messages" case to a different ticket. 4. I think it's more than sufficiently clear as suggested, no one is going to expect that a specific offset they provided is going to be overruled by a general single default. The implementation is also crystal clear - seek to the position identified by startingTime, then seek to any specific offsets for specific partitions Yes, this is all bikeshedding, but it's bikeshedding that directly affects what people are actually able to do with the api. Needlessly restricting it for reasons that have nothing to do with safety is just going to piss users off for no reason. Just because you don't have a use case that needs it, doesn't mean you should arbitrarily prevent users from doing it. Please, just choose something and let me build it so that people can actually use the thing by the next release was (Author: c...@koeninger.org): 1. we dont have lists, we have strings. regexes and valid topic names have overlaps (dot is the obvious one). 2. Mapping directly to kafka method names means we don't have to come up with some other (weird and possibly overlapping) name when they add more ways to subscribe, we just use theirs. 3. I think this is a mess with kafka semantics for the reasons both you and I have already expressed. At any rate, I think Michael already clearly punted the "starting X" case to a different topic. 4. I think it's more than sufficiently clear as suggested, no one is going to expect that a specific offset they provided is going to be overruled by a general single default. The implementation is also crystal clear - seek to the position identified by startingTime, then seek to any specific offsets for specific partitions Yes, this is all bikeshedding, but it's bikeshedding that directly affects what people are actually able to do with the api. Needlessly restricting it for reasons that have nothing to do with safety is just going to piss users off for no reason. Just because you don't have a use case that needs it, doesn't mean you should arbitrarily prevent users from doing it. Please, just choose something and let me build it so that people can actually use the thing by the next release > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573395#comment-15573395 ] Cody Koeninger commented on SPARK-17812: 1. we dont have lists, we have strings. regexes and valid topic names have overlaps (dot is the obvious one). 2. Mapping directly to kafka method names means we don't have to come up with some other (weird and possibly overlapping) name when they add more ways to subscribe, we just use theirs. 3. I think this is a mess with kafka semantics for the reasons both you and I have already expressed. At any rate, I think Michael already clearly punted the "starting X" case to a different topic. 4. I think it's more than sufficiently clear as suggested, no one is going to expect that a specific offset they provided is going to be overruled by a general single default. The implementation is also crystal clear - seek to the position identified by startingTime, then seek to any specific offsets for specific partitions Yes, this is all bikeshedding, but it's bikeshedding that directly affects what people are actually able to do with the api. Needlessly restricting it for reasons that have nothing to do with safety is just going to piss users off for no reason. Just because you don't have a use case that needs it, doesn't mean you should arbitrarily prevent users from doing it. Please, just choose something and let me build it so that people can actually use the thing by the next release > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17812: --- Comment: was deleted (was: One other slightly ugly thing... {noformat} // starting topicpartitions, no explicit offset .option("assign", """{"topicfoo": [0, 1],"topicbar": [0, 1]}""" // do you allow specifying with explicit offsets in the same config option? // or force it all into startingOffsetForRealzYo? .option("assign", """{ "topicfoo" :{ "0": 1234, "1": 4567 }, "topicbar" : { "0": 1234, "1": 4567 }}""") {noformat}) > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573166#comment-15573166 ] Cody Koeninger edited comment on SPARK-17812 at 10/13/16 9:17 PM: -- Here's my concrete suggestion: 3 mutually exclusive ways of subscribing: {noformat} .option("subscribe","topicFoo,topicBar") .option("subscribePattern","topic.*") .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") {noformat} where assign can only be specified that way, no inline offsets 2 non-mutually exclusive ways of specifying starting position, explicit startingOffsets obviously take priority: {noformat} .option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}}""") .option("startingTime", "earliest" | "latest" | long) {noformat} where long is a timestamp, work to be done on that later. Note that even kafka 0.8 has a (really crappy based on log file modification time) api for time so later pursuing timestamps startingTime doesn't necessarily exclude it was (Author: c...@koeninger.org): Here's my concrete suggestion: 3 mutually exclusive ways of subscribing: {noformat} .option("subscribe","topicFoo,topicBar") .option("subscribePattern","topic.*") .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") {noformat} where assign can only be specified that way, no inline offsets 2 non-mutually exclusive ways of specifying starting position, explicit startingOffsets obviously take priority: {noformat} .option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}""") .option("startingTime", "earliest" | "latest" | long) {noformat} where long is a timestamp, work to be done on that later. Note that even kafka 0.8 has a (really crappy based on log file modification time) api for time so later pursuing timestamps startingTime doesn't necessarily exclude it > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573166#comment-15573166 ] Cody Koeninger commented on SPARK-17812: Here's my concrete suggestion: 3 mutually exclusive ways of subscribing: {noformat} .option("subscribe","topicFoo,topicBar") .option("subscribePattern","topic.*") .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") {noformat} where assign can only be specified that way, no inline offsets 2 non-mutually exclusive ways of specifying starting position, explicit startingOffsets obviously take priority: {noformat} .option("startingOffsets", """{"topicFoo": {"0": 1234, "1", 4567}""") .option("startingTime", "earliest" | "latest" | long) {noformat} where long is a timestamp, work to be done on that later. Note that even kafka 0.8 has a (really crappy based on log file modification time) api for time so later pursuing timestamps startingTime doesn't necessarily exclude it > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572922#comment-15572922 ] Cody Koeninger edited comment on SPARK-17812 at 10/13/16 8:33 PM: -- Sorry, I didn't see this comment until just now. X offsets back per partition is not a reasonable proxy for time when you're dealing with a stream that has multiple topics in it. Agree we should break that out, focus on defining starting offsets in this ticket. The concern with startingOffsets naming is that, because auto.offset.reset is orthogonal to specifying some offsets, you have a situation like this: {noformat} .format("kafka") .option("subscribePattern", "topic.*") .option("startingOffset", "latest") .option("startingOffsetForRealzYo", """ { "topicfoo" : { "0": 1234, "1": 4567 }, "topicbar" : { "0": 1234, "1": 4567 }}""") {noformat} where startingOffsetForRealzYo has a more reasonable name that conveys it is specifying starting offsets, yet is not confusingly similar to startingOffset Trying to hack it all into one json as an alternative, with a "default" topic, means you're going to have to pick a key that isn't a valid topic, or add yet another layer of indirection. It also makes it harder to make the format consistent with SPARK-17829 (which seems like a good thing to keep consistent, I agree) Obviously I think you should just change the name, but it's your show. was (Author: c...@koeninger.org): Sorry, I didn't see this comment until just now. X offsets back per partition is not a reasonable proxy for time when you're dealing with a stream that has multiple topics in it. Agree we should break that out, focus on defining starting offsets in this ticket. The concern with startingOffsets naming is that, because auto.offset.reset is orthogonal to specifying some offsets, you have a situation like this: .format("kafka") .option("subscribePattern", "topic.*") .option("startingOffset", "latest") .option("startingOffsetForRealzYo", """ { "topicfoo" : { "0": 1234, "1": 4567 }, "topicbar" : { "0": 1234, "1": 4567 }}""") where startingOffsetForRealzYo has a more reasonable name that conveys it is specifying starting offsets, yet is not confusingly similar to startingOffset Trying to hack it all into one json as an alternative, with a "default" topic, means you're going to have to pick a key that isn't a valid topic, or add yet another layer of indirection. It also makes it harder to make the format consistent with SPARK-17829 (which seems like a good thing to keep consistent, I agree) Obviously I think you should just change the name, but it's your show. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573089#comment-15573089 ] Cody Koeninger commented on SPARK-17812: One other slightly ugly thing... {noformat} // starting topicpartitions, no explicit offset .option("assign", """{"topicfoo": [0, 1],"topicbar": [0, 1]}""" // do you allow specifying with explicit offsets in the same config option? // or force it all into startingOffsetForRealzYo? .option("assign", """{ "topicfoo" :{ "0": 1234, "1": 4567 }, "topicbar" : { "0": 1234, "1": 4567 }}""") {noformat} > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572922#comment-15572922 ] Cody Koeninger commented on SPARK-17812: Sorry, I didn't see this comment until just now. X offsets back per partition is not a reasonable proxy for time when you're dealing with a stream that has multiple topics in it. Agree we should break that out, focus on defining starting offsets in this ticket. The concern with startingOffsets naming is that, because auto.offset.reset is orthogonal to specifying some offsets, you have a situation like this: .format("kafka") .option("subscribePattern", "topic.*") .option("startingOffset", "latest") .option("startingOffsetForRealzYo", """ { "topicfoo" : { "0": 1234, "1": 4567 }, "topicbar" : { "0": 1234, "1": 4567 }}""") where startingOffsetForRealzYo has a more reasonable name that conveys it is specifying starting offsets, yet is not confusingly similar to startingOffset Trying to hack it all into one json as an alternative, with a "default" topic, means you're going to have to pick a key that isn't a valid topic, or add yet another layer of indirection. It also makes it harder to make the format consistent with SPARK-17829 (which seems like a good thing to keep consistent, I agree) Obviously I think you should just change the name, but it's your show. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17900) Mark the following Spark SQL APIs as stable
[ https://issues.apache.org/jira/browse/SPARK-17900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572119#comment-15572119 ] Cody Koeninger commented on SPARK-17900: Thanks for doing this, should make things clearer. Those divisions make sense to me. > Mark the following Spark SQL APIs as stable > --- > > Key: SPARK-17900 > URL: https://issues.apache.org/jira/browse/SPARK-17900 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > > Mark the following stable: > Dataset/DataFrame > - functions, since 1.3 > - ColumnName, since 1.3 > - DataFrameNaFunctions, since 1.3.1 > - DataFrameStatFunctions, since 1.4 > - UserDefinedFunction, since 1.3 > - UserDefinedAggregateFunction, since 1.5 > - Window and WindowSpec, since 1.4 > Data sources: > - DataSourceRegister, since 1.5 > - RelationProvider, since 1.3 > - SchemaRelationProvider, since 1.3 > - CreatableRelationProvider, since 1.3 > - BaseRelation, since 1.3 > - TableScan, since 1.3 > - PrunedScan, since 1.3 > - PrunedFilteredScan, since 1.3 > - InsertableRelation, since 1.3 > Keep the following experimental / evolving: > Data sources: > - CatalystScan (tied to internal logical plans so it is not stable by > definition) > Structured streaming: > - all classes (introduced new in 2.0 and will likely change) > Dataset typed operations (introduced in 1.6 and 2.0 and might change, > although probability is low) > - all typed methods on Dataset > - KeyValueGroupedDataset > - o.a.s.sql.expressions.javalang.typed > - o.a.s.sql.expressions.scalalang.typed > - methods that return typed Dataset in SparkSession -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-15408) Spark streaming app crashes with NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/SPARK-15408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger closed SPARK-15408. -- Resolution: Cannot Reproduce > Spark streaming app crashes with NotLeaderForPartitionException > > > Key: SPARK-15408 > URL: https://issues.apache.org/jira/browse/SPARK-15408 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.6.0 > Environment: Ubuntu 64 bit >Reporter: Johny Mathew >Priority: Critical > > We have a spark streaming application reading from kafka (with Kafka Direct > API) and it crashed with the exception shown in the next paragraph. We have a > 5 node kafka cluster with 19 partitions (replication factor 3). Even though > the the spark application crashed the other kafka consumer apps were running > fine. Only one of the 5 kafka node was not working correctly (it did not go > down) > /opt/hadoop/bin/yarn application -status application_1463151451543_0007 > 16/05/13 20:09:56 INFO client.RMProxy: Connecting to ResourceManager at > /172.16.130.189:8050 > Application Report : > Application-Id : application_1463151451543_0007 > Application-Name : com.ibm.alchemy.eventgen.EventGenMetrics > Application-Type : SPARK > User : stack > Queue : default > Start-Time : 1463155034571 > Finish-Time : 1463155310520 > Progress : 100% > State : FINISHED > Final-State : FAILED > Tracking-URL : N/A > RPC Port : 0 > AM Host : 172.16.130.188 > Aggregate Resource Allocation : 9562329 MB-seconds, 2393 vcore-seconds > Diagnostics : User class threw exception: > org.apache.spark.SparkException: > ArrayBuffer(kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, > kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: > Couldn't find leader offsets for Set([alchemy-metrics,17], > [alchemy-metrics,10], [alchemy-metrics,3], [alchemy-metrics,4], > [alchemy-metrics,9], [alchemy-metrics,15], [alchemy-metrics,18], > [alchemy-metrics,5])) > We cleared checkpoint and started the application but it crashed again. Then > at the end we found out the misbehaving kafka node and restarted it which > fixed the problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15272) DirectKafkaInputDStream doesn't work with window operation
[ https://issues.apache.org/jira/browse/SPARK-15272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570221#comment-15570221 ] Cody Koeninger commented on SPARK-15272: Checking to see if the 0.10 consumer's handling of preferred locations http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies addresses this. > DirectKafkaInputDStream doesn't work with window operation > -- > > Key: SPARK-15272 > URL: https://issues.apache.org/jira/browse/SPARK-15272 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2 >Reporter: Lubomir Nerad > > Using Kafka direct {{DStream}} with simple window operation like: > {code:java} > kafkaDStream.window(Durations.milliseconds(1), > Durations.milliseconds(1000)); > .print(); > {code} > with 1s batch duration either freezes after several seconds or lags terribly > (depending on cluster mode). > This happens when Kafka brokers are not part of the Spark cluster (they are > on different nodes). The {{KafkaRDD}} still reports them as preferred > locations. This doesn't seem to be problem in non-window scenarios but with > window it conflicts with delay scheduling algorithm implemented in > {{TaskSetManager}}. It either significantly delays (Yarn mode) or completely > drains (Spark mode) resource offers with {{TaskLocality.ANY}} which are > needed to process tasks with these Kafka broker aligned preferred locations. > When delay scheduling algorithm is switched off ({{spark.locality.wait=0}}), > the example works correctly. > I think that the {{KafkaRDD}} shouldn't report preferred locations if the > brokers don't correspond to worker nodes or allow the reporting of preferred > locations to be switched off. Also it would be good if delay scheduling > algorithm didn't drain / delay offers in the case, the tasks have unmatched > preferred locations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15272) DirectKafkaInputDStream doesn't work with window operation
[ https://issues.apache.org/jira/browse/SPARK-15272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570221#comment-15570221 ] Cody Koeninger edited comment on SPARK-15272 at 10/12/16 11:33 PM: --- Does the 0.10 consumer's handling of preferred locations http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies address this for you? was (Author: c...@koeninger.org): Checking to see if the 0.10 consumer's handling of preferred locations http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies addresses this. > DirectKafkaInputDStream doesn't work with window operation > -- > > Key: SPARK-15272 > URL: https://issues.apache.org/jira/browse/SPARK-15272 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.5.2 >Reporter: Lubomir Nerad > > Using Kafka direct {{DStream}} with simple window operation like: > {code:java} > kafkaDStream.window(Durations.milliseconds(1), > Durations.milliseconds(1000)); > .print(); > {code} > with 1s batch duration either freezes after several seconds or lags terribly > (depending on cluster mode). > This happens when Kafka brokers are not part of the Spark cluster (they are > on different nodes). The {{KafkaRDD}} still reports them as preferred > locations. This doesn't seem to be problem in non-window scenarios but with > window it conflicts with delay scheduling algorithm implemented in > {{TaskSetManager}}. It either significantly delays (Yarn mode) or completely > drains (Spark mode) resource offers with {{TaskLocality.ANY}} which are > needed to process tasks with these Kafka broker aligned preferred locations. > When delay scheduling algorithm is switched off ({{spark.locality.wait=0}}), > the example works correctly. > I think that the {{KafkaRDD}} shouldn't report preferred locations if the > brokers don't correspond to worker nodes or allow the reporting of preferred > locations to be switched off. Also it would be good if delay scheduling > algorithm didn't drain / delay offers in the case, the tasks have unmatched > preferred locations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11698) Add option to ignore kafka messages that are out of limit rate
[ https://issues.apache.org/jira/browse/SPARK-11698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570208#comment-15570208 ] Cody Koeninger commented on SPARK-11698: Would a custom ConsumerStrategy for the new consumer added in SPARK-12177 allow you to address this issue? You could supply a Consumer implementation that overrides poll > Add option to ignore kafka messages that are out of limit rate > -- > > Key: SPARK-11698 > URL: https://issues.apache.org/jira/browse/SPARK-11698 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Liang-Chi Hsieh > > With spark.streaming.kafka.maxRatePerPartition, we can control the max rate > limit. However, we can not ignore these messages out of limit. These messages > will be consumed in next iteration. We have a use case that we need to ignore > these messages and process latest messages in next iteration. > In other words, we simply want to consume part of messages in each iteration > and ignore remaining messages that are not consumed. > We add an option for this purpose. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger resolved SPARK-10320. Resolution: Fixed Fix Version/s: 2.0.0 SPARK-12177 added the new consumer, which supports SubscribePattern > Kafka Support new topic subscriptions without requiring restart of the > streaming context > > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > Fix For: 2.0.0 > > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger closed SPARK-9947. - Resolution: Won't Fix The direct DStream api already gives access to offsets, and it seems clear that most future work on streaming checkpointing is going to be focused on structured streaming. SPARK-15406 > Separate Metadata and State Checkpoint Data > --- > > Key: SPARK-9947 > URL: https://issues.apache.org/jira/browse/SPARK-9947 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow > Original Estimate: 168h > Remaining Estimate: 168h > > Problem: When updating an application that has checkpointing enabled to > support the updateStateByKey and 24/7 operation functionality, you encounter > the problem where you might like to maintain state data between restarts but > delete the metadata containing execution state. > If checkpoint data exists between code redeployment, the program may not > execute properly or at all. My current workaround for this issue is to wrap > updateStateByKey with my own function that persists the state after every > update to my own separate directory. (That allows me to delete the checkpoint > with its metadata before redeploying) Then, when I restart the application, I > initialize the state with this persisted data. This incurs additional > overhead due to persisting of the same data twice: once in the checkpoint and > once in my persisted data folder. > If Kafka Direct API offsets could be stored in another separate checkpoint > directory, that would help address the problem of having to blow that away > between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570186#comment-15570186 ] Cody Koeninger commented on SPARK-8337: --- Can this be closed, given that the subtasks are resolved and any future discussion of python dstream kafka support seems to be in SPARK-16534 > KafkaUtils.createDirectStream for python is lacking API/feature parity with > the Scala/Java version > -- > > Key: SPARK-8337 > URL: https://issues.apache.org/jira/browse/SPARK-8337 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming >Affects Versions: 1.4.0 >Reporter: Amit Ramesh >Priority: Critical > > See the following thread for context. > http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-5505) ConsumerRebalanceFailedException from Kafka consumer
[ https://issues.apache.org/jira/browse/SPARK-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger closed SPARK-5505. - Resolution: Won't Fix The old kafka High Level Consumer has been abandoned at this point. SPARK-12177 and SPARK-15406 use the new consumer api. > ConsumerRebalanceFailedException from Kafka consumer > > > Key: SPARK-5505 > URL: https://issues.apache.org/jira/browse/SPARK-5505 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0 > Environment: CentOS6 / Linux 2.6.32-358.2.1.el6.x86_64 > java version "1.7.0_21" > Scala compiler version 2.9.3 > 2 cores Intel(R) Xeon(R) CPU E5620 @ 2.40GHz / 16G RAM > VMWare VM. >Reporter: Greg Temchenko >Priority: Critical > > From time to time Spark streaming produces a ConsumerRebalanceFailedException > and stops receiving messages. After that all consequential RDDs are empty. > {code} > 15/01/30 18:18:36 ERROR consumer.ZookeeperConsumerConnector: > [terran_vmname-1422670149779-243b4e10], error during syncedRebalance > kafka.common.ConsumerRebalanceFailedException: > terran_vmname-1422670149779-243b4e10 can't rebalance after 4 retries > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355) > {code} > The problem is also described in the mailing list: > http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-td19570.html > As I understand it's a critical blocker for kafka-spark streaming production > use. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-5718) Add native offset management for ReliableKafkaReceiver
[ https://issues.apache.org/jira/browse/SPARK-5718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger resolved SPARK-5718. --- Resolution: Fixed Fix Version/s: 2.0.0 SPARK-12177 added support for the native kafka offset commit api > Add native offset management for ReliableKafkaReceiver > -- > > Key: SPARK-5718 > URL: https://issues.apache.org/jira/browse/SPARK-5718 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Saisai Shao > Fix For: 2.0.0 > > > Kafka 0.8.2 supports native offsets management instead of ZK, this will get > better performance, for now in ReliableKafkaReceiver, we rely on ZK to manage > the offsets, this potentially will be a bottleneck if the injection rate is > high (once per 200ms by default), so here in order to get better performance > as well as keeping consistent with Kafka, add native offset management for > ReliableKafkaReceiver. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10815) API design: data sources and sinks
[ https://issues.apache.org/jira/browse/SPARK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570139#comment-15570139 ] Cody Koeninger commented on SPARK-10815: Another unfortunate thing about the Sink api is that it only exposes batch ids, with no way that I'm aware of to get at (e.g. Kafka) offsets. Access to offsets for sinks that can take advantage of it would be preferable, as it's better for disaster recovery and doesn't lock you in to a particular streaming engine. > API design: data sources and sinks > -- > > Key: SPARK-10815 > URL: https://issues.apache.org/jira/browse/SPARK-10815 > Project: Spark > Issue Type: Sub-task > Components: SQL, Streaming >Reporter: Reynold Xin > > The existing (in 2.0) source/sink interface for structured streaming depends > on RDDs. This dependency has two issues: > 1. The RDD interface is wide and difficult to stabilize across versions. This > is similar to point 1 in https://issues.apache.org/jira/browse/SPARK-15689. > Ideally, a source/sink implementation created for Spark 2.x should work in > Spark 10.x, assuming the JVM is still around. > 2. It is difficult to swap in/out a different execution engine. > The purpose of this ticket is to create a stable interface that addresses the > above two. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17344) Kafka 0.8 support for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15567457#comment-15567457 ] Cody Koeninger commented on SPARK-17344: Given the choice between rewriting underlying kafka consumers and having a split codebase, I'd rather have a split codebase. Of course I'd rather not sink development effort into an old version of kafka at all, until the structured stream for 0.10 is working for my use cases. But If you want to wrap the 0.8 rdd in a structured stream, go for it, I'll help you figure out how do it. Seriously. Don't expect larger project uptake, but if you just need something to work for you > Kafka 0.8 support for Structured Streaming > -- > > Key: SPARK-17344 > URL: https://issues.apache.org/jira/browse/SPARK-17344 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Frederick Reiss > > Design and implement Kafka 0.8-based sources and sinks for Structured > Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-17837) Disaster recovery of offsets from WAL
[ https://issues.apache.org/jira/browse/SPARK-17837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger closed SPARK-17837. -- Resolution: Duplicate Duplicate of SPARK-17829 > Disaster recovery of offsets from WAL > - > > Key: SPARK-17837 > URL: https://issues.apache.org/jira/browse/SPARK-17837 > Project: Spark > Issue Type: Sub-task >Reporter: Cody Koeninger > > "The SQL offsets are stored in a WAL at $checkpointLocation/offsets/$batchId. > As reynold suggests though, we should change this to use a less opaque > format." -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17344) Kafka 0.8 support for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15566244#comment-15566244 ] Cody Koeninger commented on SPARK-17344: How long would it take CDH to distribute 0.10 if there was a compelling Spark client for it? How are you going to handle SSL? You can't avoid the complexity of caching consumers if you still want the benefits of prefetching, and doing an SSL handshake for every batch will kill performance if they aren't cached. Also note that this is a pretty prime example of what I'm talking about in my dev mailing list discussion on SIPs. This issue has been brought up, and decided against continuing support of 0.8, multiple times. You guys started making promises about structured streaming for Kafka over half a year ago, and still don't have it feature complete. This is a big potential detour for uncertain gain. The real underlying problem is still how you're going to do better than simply wrapping a DStream, and I don't see how this is directly relevant. > Kafka 0.8 support for Structured Streaming > -- > > Key: SPARK-17344 > URL: https://issues.apache.org/jira/browse/SPARK-17344 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Frederick Reiss > > Design and implement Kafka 0.8-based sources and sinks for Structured > Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17853) Kafka OffsetOutOfRangeException on DStreams union from separate Kafka clusters with identical topic names.
[ https://issues.apache.org/jira/browse/SPARK-17853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565559#comment-15565559 ] Cody Koeninger commented on SPARK-17853: Good, will keep this ticket open at least until documentation is made clearer. On Oct 11, 2016 8:48 AM, "Aleksander Ihnatowicz (JIRA)" > Kafka OffsetOutOfRangeException on DStreams union from separate Kafka > clusters with identical topic names. > -- > > Key: SPARK-17853 > URL: https://issues.apache.org/jira/browse/SPARK-17853 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Marcin Kuthan > > During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException > reported by Kafka client. In our scenario we create single DStream as a union > of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). > Both Kafka clusters have the same topics and number of partitions. > After quick investigation, I found that class DirectKafkaInputDStream keeps > offset state for topic and partitions, but it is not aware of different Kafka > clusters. > For every topic, single DStream is created as a union from all configured > Kafka clusters. > {code} > class KafkaDStreamSource(configs: Iterable[Map[String, String]]) { > def createSource(ssc: StreamingContext, topic: String): DStream[(String, > Array[Byte])] = { > val streams = configs.map { config => > val kafkaParams = config > val kafkaTopics = Set(topic) > KafkaUtils. > createDirectStream[String, Array[Byte]]( > ssc, > LocationStrategies.PreferConsistent, > ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, > kafkaParams) > ).map { record => > (record.key, record.value) > } > } > ssc.union(streams.toSeq) > } > } > {code} > At the end, offsets from one Kafka cluster overwrite offsets from second one. > Fortunately OffsetOutOfRangeException was thrown because offsets in both > Kafka clusters are significantly different. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17853) Kafka OffsetOutOfRangeException on DStreams union from separate Kafka clusters with identical topic names.
[ https://issues.apache.org/jira/browse/SPARK-17853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565400#comment-15565400 ] Cody Koeninger commented on SPARK-17853: Use a different group id. Let me know if that addresses the issue. > Kafka OffsetOutOfRangeException on DStreams union from separate Kafka > clusters with identical topic names. > -- > > Key: SPARK-17853 > URL: https://issues.apache.org/jira/browse/SPARK-17853 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Marcin Kuthan > > During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException > reported by Kafka client. In our scenario we create single DStream as a union > of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). > Both Kafka clusters have the same topics and number of partitions. > After quick investigation, I found that class DirectKafkaInputDStream keeps > offset state for topic and partitions, but it is not aware of different Kafka > clusters. > For every topic, single DStream is created as a union from all configured > Kafka clusters. > {code} > class KafkaDStreamSource(configs: Iterable[Map[String, String]]) { > def createSource(ssc: StreamingContext, topic: String): DStream[(String, > Array[Byte])] = { > val streams = configs.map { config => > val kafkaParams = config > val kafkaTopics = Set(topic) > KafkaUtils. > createDirectStream[String, Array[Byte]]( > ssc, > LocationStrategies.PreferConsistent, > ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, > kafkaParams) > ).map { record => > (record.key, record.value) > } > } > ssc.union(streams.toSeq) > } > } > {code} > At the end, offsets from one Kafka cluster overwrite offsets from second one. > Fortunately OffsetOutOfRangeException was thrown because offsets in both > Kafka clusters are significantly different. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17853) Kafka OffsetOutOfRangeException on DStreams union from separate Kafka clusters with identical topic names.
[ https://issues.apache.org/jira/browse/SPARK-17853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565278#comment-15565278 ] Cody Koeninger commented on SPARK-17853: Which version of DStream are you using, 0-10 or 0-8? Are you using the same group id for both streams? > Kafka OffsetOutOfRangeException on DStreams union from separate Kafka > clusters with identical topic names. > -- > > Key: SPARK-17853 > URL: https://issues.apache.org/jira/browse/SPARK-17853 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Marcin Kuthan > > During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException > reported by Kafka client. In our scenario we create single DStream as a union > of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). > Both Kafka clusters have the same topics and number of partitions. > After quick investigation, I found that class DirectKafkaInputDStream keeps > offset state for topic and partitions, but it is not aware of different Kafka > clusters. > For every topic, single DStream is created as a union from all configured > Kafka clusters. > {code} > class KafkaDStreamSource(configs: Iterable[Map[String, String]]) { > def createSource(ssc: StreamingContext, topic: String): DStream[(String, > Array[Byte])] = { > val streams = configs.map { config => > val kafkaParams = config > val kafkaTopics = Set(topic) > KafkaUtils. > createDirectStream[String, Array[Byte]]( > ssc, > LocationStrategies.PreferConsistent, > ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, > kafkaParams) > ).map { record => > (record.key, record.value) > } > } > ssc.union(streams.toSeq) > } > } > {code} > At the end, offsets from one Kafka cluster overwrite offsets from second one. > Fortunately OffsetOutOfRangeException was thrown because offsets in both > Kafka clusters are significantly different. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15563285#comment-15563285 ] Cody Koeninger commented on SPARK-17812: No, it's not covered by strict assign. If you don't have this, you're basically saying you can never have well-defined starting offsets for a job that starts as SubscribePattern. The existing DStream already does this, because it doesn't conflate auto.offset.reset with user-specified offsets. > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560925#comment-15560925 ] Cody Koeninger commented on SPARK-17812: I want to start a pattern subscription at known good offsets for a particular topic, and latest available for other topics that match. > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560839#comment-15560839 ] Cody Koeninger commented on SPARK-17812: That totally kills the usability of SubscribePattern. > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560837#comment-15560837 ] Cody Koeninger commented on SPARK-17815: My personal concerns about complexity are because I'm the one who answers nearly every email on the spark user list regarding the Kafka integration. My business need relative to this ticket is that any Kafka integration I use must give me a way to transactionally store offsets and results. If those downstream offsets aren't used as at least a source of truth, then that goal isn't met. If this ticket is implicitly saying WAL is the only source of truth, that's a problem for me. SIP is what it is or may not be, we're where we are now having the discussion we're having now, and I'll leave it at that ;) > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560719#comment-15560719 ] Cody Koeninger commented on SPARK-17812: Generally agree with the direction of what you're saying, but the question of where to start for partitions that don't have offsets specified is orthogonal to the question of where/how to specify offsets for particular partitions. It's not just 4 options. > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560684#comment-15560684 ] Cody Koeninger commented on SPARK-17815: Regarding kafka consumer behavior, I'm not saying it's impossible, I'm saying it A. Needs attention, especially since what's in master isn't complete and B. May confuse users who expect the offsets in Kafka to determine where the consumer starts. Regarding wal, DStream checkpoints were in HDFS too. Being on HDFS doesn't guarantee against getting screwed up. And in any case, the boundary alignment of multiple different offset stores still applies > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17841) Kafka 0.10 commitQueue needs to be drained
Cody Koeninger created SPARK-17841: -- Summary: Kafka 0.10 commitQueue needs to be drained Key: SPARK-17841 URL: https://issues.apache.org/jira/browse/SPARK-17841 Project: Spark Issue Type: Bug Reporter: Cody Koeninger Current implementation is just iterating, not polling and removing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560241#comment-15560241 ] Cody Koeninger commented on SPARK-17147: [~graphex] My WIP is at https://github.com/koeninger/spark-1/tree/SPARK-17147 Still really rough, but it's passing the basic test I set up. Let me know at what point you have time for trying it out & we can iterate. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15559911#comment-15559911 ] Cody Koeninger commented on SPARK-17815: The WAL cannot be the only source of truth, because it can be corrupted in a situation where the downstream results and offsets are not. The downstream offsets by contrast cant be corrupted without also affecting the results, thats the whole point of transactions. Even if you do ignore the fact that the wal can be corrupted, you still have to be careful about aligning boundaries of the wal with boundaries of the downstream store. The kafka commit log cant be ignored as merely for metric collection either. A kafka consumer is going to use it in preference to auto.offset.reset as the starting point for a newly constructed consumer. I'm not saying these issues are unsolvable, but you cant just handwave them away, and they are confusing to end users. There was already confusion with only 2 stores - ZK and the dstream checkpoint. > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558572#comment-15558572 ] Cody Koeninger commented on SPARK-17147: I talked with Sean in person about this, and think there's a way to move forward. I'll start hacking on it. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4960) Interceptor pattern in receivers
[ https://issues.apache.org/jira/browse/SPARK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558557#comment-15558557 ] Cody Koeninger commented on SPARK-4960: --- Is this idea pretty much dead at this point? It seems like most attention has moved off of receiver-based dstream. > Interceptor pattern in receivers > > > Key: SPARK-4960 > URL: https://issues.apache.org/jira/browse/SPARK-4960 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Tathagata Das > > Sometimes it is good to intercept a message received through a receiver and > modify / do something with the message before it is stored into Spark. This > is often referred to as the interceptor pattern. There should be general way > to specify an interceptor function that gets applied to all receivers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM
[ https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger resolved SPARK-3146. --- Resolution: Fixed Fix Version/s: 1.3.0 > Improve the flexibility of Spark Streaming Kafka API to offer user the > ability to process message before storing into BM > > > Key: SPARK-3146 > URL: https://issues.apache.org/jira/browse/SPARK-3146 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.2, 1.1.0 >Reporter: Saisai Shao > Fix For: 1.3.0 > > > Currently Spark Streaming Kafka API stores the key and value of each message > into BM for processing, potentially this may lose the flexibility for > different requirements: > 1. currently topic/partition/offset information for each message is discarded > by KafkaInputDStream. In some scenarios people may need this information to > better filter the message, like SPARK-2388 described. > 2. People may need to add timestamp for each message when feeding into Spark > Streaming, which can better measure the system latency. > 3. Checkpointing the partition/offsets or others... > So here we add a messageHandler in interface to give people the flexibility > to preprocess the message before storing into BM. In the meantime time this > improvement keep compatible with current API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM
[ https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558550#comment-15558550 ] Cody Koeninger commented on SPARK-3146: --- SPARK-4964 / the direct stream added a messageHandler. > Improve the flexibility of Spark Streaming Kafka API to offer user the > ability to process message before storing into BM > > > Key: SPARK-3146 > URL: https://issues.apache.org/jira/browse/SPARK-3146 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.2, 1.1.0 >Reporter: Saisai Shao > > Currently Spark Streaming Kafka API stores the key and value of each message > into BM for processing, potentially this may lose the flexibility for > different requirements: > 1. currently topic/partition/offset information for each message is discarded > by KafkaInputDStream. In some scenarios people may need this information to > better filter the message, like SPARK-2388 described. > 2. People may need to add timestamp for each message when feeding into Spark > Streaming, which can better measure the system latency. > 3. Checkpointing the partition/offsets or others... > So here we add a messageHandler in interface to give people the flexibility > to preprocess the message before storing into BM. In the meantime time this > improvement keep compatible with current API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17837) Disaster recovery of offsets from WAL
[ https://issues.apache.org/jira/browse/SPARK-17837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-17837: --- Summary: Disaster recovery of offsets from WAL (was: Disaster recover of offsets from WAL) > Disaster recovery of offsets from WAL > - > > Key: SPARK-17837 > URL: https://issues.apache.org/jira/browse/SPARK-17837 > Project: Spark > Issue Type: Sub-task >Reporter: Cody Koeninger > > "The SQL offsets are stored in a WAL at $checkpointLocation/offsets/$batchId. > As reynold suggests though, we should change this to use a less opaque > format." -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17837) Disaster recover of offsets from WAL
Cody Koeninger created SPARK-17837: -- Summary: Disaster recover of offsets from WAL Key: SPARK-17837 URL: https://issues.apache.org/jira/browse/SPARK-17837 Project: Spark Issue Type: Sub-task Reporter: Cody Koeninger "The SQL offsets are stored in a WAL at $checkpointLocation/offsets/$batchId. As reynold suggests though, we should change this to use a less opaque format." -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558528#comment-15558528 ] Cody Koeninger commented on SPARK-17815: So if you start committing offsets to kafka, there are going to be potentially three places offsets are stored: 1. structured WAL 2. kafka commit topic 3. downstream store It's going to be easy to get confused as to what the source of truth is. > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558506#comment-15558506 ] Cody Koeninger commented on SPARK-17812: So I'm willing to do this work, mostly because I've already done it, but there are some user interface issues here that need to get figured out. You already chose the name "startingOffset" for specifying the equivalent of auto.offset.reset. Now we're looking at actually adding starting offsets. Furthermore, it should be possible to specify starting offsets for some partitions, while relying on the equivalent of auto.offset.reset for other unspecified ones (the existing DStream does this). What are you expecting configuration of this to look like? I can see a couple of options: 1. Try to cram everything into startingOffset with some horrible string-based DSL 2. Have a separate option for specifying starting offsets for real, with a name that makes it clear what it is, yet doesn't use "startingoffset". As for the value, I guess in json form of some kind? { "topicfoo" : { "0": 1234, "1": 4567 }} Somewhat related is that Assign needs a way of specifying topicpartitions. As far as the idea to seek back X offsets, I think it'd be better to look at offset time indexing. If you are going to do the X offsets back idea, the offsets -1L and -2L already have special meaning, so it's going to be kind of confusing to allow negative numbers in an interface that is specifying offsets. > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17344) Kafka 0.8 support for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15558474#comment-15558474 ] Cody Koeninger commented on SPARK-17344: I think this is premature until you have a fully operational battlestation, er, structured stream, that has all the necessary features for 0.10 Regarding the conversation with Michael about possibly using the kafka protocol directly as a way to work around the differences between 0.8 and 0.10, please don't consider that. Every kafka consumer implementation I've ever used has bugs, and we don't need to spend time writing another buggy one. By contrast, writing a streaming source shim around the existing simple consumer-based 0.8 spark rdd would be a weekend project, it just wouldn't have stuff like SSL, dynamic topics, or offset committing. > Kafka 0.8 support for Structured Streaming > -- > > Key: SPARK-17344 > URL: https://issues.apache.org/jira/browse/SPARK-17344 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Frederick Reiss > > Design and implement Kafka 0.8-based sources and sinks for Structured > Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554160#comment-15554160 ] Cody Koeninger commented on SPARK-15406: I think if you're already gravitating towards json as the lowest common denominator for configuration interface, it makes sense to use that for the log. > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554139#comment-15554139 ] Cody Koeninger commented on SPARK-15406: When something has gone wrong, as an end user, how do I get Kafka offsets out of a structured streaming checkpoint? Honest question, is there a way to do it, I did look but probably not sufficiently thoroughly. > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553955#comment-15553955 ] Cody Koeninger edited comment on SPARK-15406 at 10/7/16 3:20 AM: - As soon as you say "checkpoint", a whole class of people who were burned by the problems with DStream checkpoints are going to tune out. I don't trust anything unless I can get my hands on the offsets. Regarding exactly once, this has two components, producing idempotently to kafka, and storing idempotently/transactionally downstream. Specifically regarding producing to Kafka, I was talking with Jay Kreps about this yesterday. There have been a lot of ideas over the years (e.g. put-with-expected-offset). The top contender at this point is apparently something along the lines of a tcp/ip sequence number on the producer side allowing for repeated writes to be ignored on the broker side. The KIP doc for it should be coming "soon". Specifically regarding downstream write, without an actual implementation for real downstream systems and no way to get offsets except for individual messages, what exists currently couldn't even be tested in production at my current gig. I could probably pretty quickly write a jdbc sink and a Citus sink, but those seem like they would require significant knowledge and / or assumptions about the job. Having a schema helps a lot, but do you still want to assume things like a particular table format for outputs and offsets? That those sinks only work with Kafka? was (Author: c...@koeninger.org): As soon as you say "checkpoint", a whole class of people who were burned by the problems with DStream checkpoints are going to tune out. I don't trust anything unless I can get my hands on the offsets. Regarding exactly once, this has two components, producing idempotently to kafka, and storing idempotently/transactionally downstream. Specifically regarding producing to Kafka, I was talking with Jay Kreps about this yesterday. There have been a lot of ideas over the years (e.g. put-with-expected-offset). The top contender at this point is apparently something along the lines of a tcp/ip sequence number on the producer side allowing for repeated writes to be ignored on the broker side. The KIP doc for it should be coming "soon". Specifically regarding downstream write, without an actual implementation for real downstream systems and no way to get offsets except for individual messages, what exists currently couldn't even be tested in production at my current gig. I could probably pretty quickly write a jdbc sink and a Citus sink, but those seem like they would require significant knowledge and / or assumptions about the job. Having a schema helps a lot, but do you still want to assume things like a particular table format for outputs? That those sinks only work with Kafka? > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org