[jira] [Created] (SPARK-42750) Support INSERT INTO by name
Jose Torres created SPARK-42750: --- Summary: Support INSERT INTO by name Key: SPARK-42750 URL: https://issues.apache.org/jira/browse/SPARK-42750 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0 Reporter: Jose Torres In some use cases, users have incoming dataframes with fixed column names which might differ from the canonical order. Currently there's no way to handle this easily through the INSERT INTO API - the user has to make sure the columns are in the right order as they would when inserting a tuple. We should add an optional BY NAME clause, such that: INSERT INTO tgt BY NAME takes each column of and inserts it into the column in `tgt` which has the same name according to the configured `resolver` logic. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35246) Streaming-batch intersects are incorrectly allowed through UnsupportedOperationsChecker
[ https://issues.apache.org/jira/browse/SPARK-35246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres updated SPARK-35246: Summary: Streaming-batch intersects are incorrectly allowed through UnsupportedOperationsChecker (was: Disable intersects for all streaming queries) > Streaming-batch intersects are incorrectly allowed through > UnsupportedOperationsChecker > --- > > Key: SPARK-35246 > URL: https://issues.apache.org/jira/browse/SPARK-35246 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0, 3.1.0 >Reporter: Jose Torres >Priority: Major > Fix For: 3.2.0 > > > The UnsupportedOperationChecker currently rejects streaming intersects only > if both sides are streaming, but they don't work if even one side is > streaming. The following simple test, for example, fails with a cryptic > None.get error because the state store can't plan itself properly. > {code:java} > test("intersect") { > val input = MemoryStream[Long] > val df = input.toDS().intersect(spark.range(10).as[Long]) > testStream(df) ( > AddData(input, 1L), > CheckAnswer(1) > ) > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35246) Disable intersects for all streaming queries
Jose Torres created SPARK-35246: --- Summary: Disable intersects for all streaming queries Key: SPARK-35246 URL: https://issues.apache.org/jira/browse/SPARK-35246 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.1.0, 3.0.0 Reporter: Jose Torres Fix For: 3.2.0 The UnsupportedOperationChecker currently rejects streaming intersects only if both sides are streaming, but they don't work if even one side is streaming. The following simple test, for example, fails with a cryptic None.get error because the state store can't plan itself properly. {code:java} test("intersect") { val input = MemoryStream[Long] val df = input.toDS().intersect(spark.range(10).as[Long]) testStream(df) ( AddData(input, 1L), CheckAnswer(1) ) } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29468) Floating point literals produce incorrect SQL
Jose Torres created SPARK-29468: --- Summary: Floating point literals produce incorrect SQL Key: SPARK-29468 URL: https://issues.apache.org/jira/browse/SPARK-29468 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.4 Reporter: Jose Torres A FLOAT literal 1.2345 returns SQL `CAST(1.2345 AS FLOAT)`. For very small values this doesn't work; `CAST(1e-44 AS FLOAT)` for example doesn't parse, because the parser tries to squeeze the numeric literal 1e-44 into a DECIMAL(38). -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29103) CheckAnalysis for data source V2 ALTER TABLE ignores case sensitivity
Jose Torres created SPARK-29103: --- Summary: CheckAnalysis for data source V2 ALTER TABLE ignores case sensitivity Key: SPARK-29103 URL: https://issues.apache.org/jira/browse/SPARK-29103 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Jose Torres For each column referenced, we run ```val field = table.schema.findNestedField(fieldName, includeCollections = true)``` and fail analysis if the field isn't there. This check is always case-sensitive on column names, even if the underlying catalog is case insensitive, so it will sometimes throw on ALTER operations which the catalog supports. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26046) Add a way for StreamingQueryManager to remove all listeners
[ https://issues.apache.org/jira/browse/SPARK-26046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres resolved SPARK-26046. - Fix Version/s: 3.0.0 Resolution: Fixed Issue resolved by pull request 25518 [https://github.com/apache/spark/pull/25518] > Add a way for StreamingQueryManager to remove all listeners > --- > > Key: SPARK-26046 > URL: https://issues.apache.org/jira/browse/SPARK-26046 > Project: Spark > Issue Type: Task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Mukul Murthy >Priority: Major > Fix For: 3.0.0 > > > StreamingQueryManager should have a way to clear out all listeners. There's > addListener(listener) and removeListener(listener), but not > removeAllListeners. We should expose a new method -removeAllListeners() that > calls listenerBus.removeAllListeners (added here: > [https://github.com/apache/spark/commit/9690eba16efe6d25261934d8b73a221972b684f3])- > listListeners() that can be used to remove listeners. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28190) Data Source - State
[ https://issues.apache.org/jira/browse/SPARK-28190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16911678#comment-16911678 ] Jose Torres commented on SPARK-28190: - Yeah, I think an SPIP is needed here. It sounds like we're planning to support state read and write as external interfaces, so we need a broad consensus on what those interfaces should be and how they'll constrain future evolvability. > Data Source - State > --- > > Key: SPARK-28190 > URL: https://issues.apache.org/jira/browse/SPARK-28190 > Project: Spark > Issue Type: Umbrella > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Priority: Major > > "State" is becoming one of most important data on most of streaming > frameworks, which makes us getting continuous result of the query. In other > words, query could be no longer valid once state is corrupted or lost. > Ideally we could run the query from the first of data to construct a > brand-new state for current query, but in reality it may not be possible for > many reasons, like input data source having retention, lots of resource waste > to rerun from start, etc. > > There're other cases which end users want to deal with state, like creating > initial state from existing data via batch query (given batch query could be > far more efficient and faster). > I'd like to propose a new data source which handles "state" in batch query, > enabling read and write on state. > Allowing state read brings couple of benefits: > * You can analyze the state from "outside" of your streaming query > * It could be useful when there's something which can be derived from > existing state of existing query - note that state is not designed to be > shared among multiple queries > Allowing state (re)write brings couple of major benefits: > * State can be repartitioned physically > * Schema in state can be changed, which means you don't need to run the > query from the start when the query should be changed > * You can remove state rows if you want, like reducing size, removing > corrupt, etc. > * You can bootstrap state in your new query with existing data efficiently, > don't need to run streaming query from the start point > Btw, basically I'm planning to contribute my own works > ([https://github.com/HeartSaVioR/spark-state-tools]), so for many of > sub-issues it would require not-too-much amount of efforts to submit patches. > I'll try to apply new DSv2, so it could be a major effort while preparing to > donate code. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-28223) stream-stream joins should fail unsupported checker in update mode
[ https://issues.apache.org/jira/browse/SPARK-28223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres resolved SPARK-28223. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 25023 [https://github.com/apache/spark/pull/25023] > stream-stream joins should fail unsupported checker in update mode > -- > > Key: SPARK-28223 > URL: https://issues.apache.org/jira/browse/SPARK-28223 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.3 >Reporter: Jose Torres >Priority: Major > Fix For: 3.0.0 > > > Right now they fail only for inner joins, because we implemented the check > when that was the only supported type. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28223) stream-stream joins should fail unsupported checker in update mode
Jose Torres created SPARK-28223: --- Summary: stream-stream joins should fail unsupported checker in update mode Key: SPARK-28223 URL: https://issues.apache.org/jira/browse/SPARK-28223 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.3 Reporter: Jose Torres Right now they fail only for inner joins, because we implemented the check when that was the only supported type. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27711) InputFileBlockHolder should be unset at the end of tasks
Jose Torres created SPARK-27711: --- Summary: InputFileBlockHolder should be unset at the end of tasks Key: SPARK-27711 URL: https://issues.apache.org/jira/browse/SPARK-27711 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.3 Reporter: Jose Torres InputFileBlockHolder should be unset at the end of each task. Otherwise the value of input_file_name() can leak over to other tasks instead of beginning as empty string. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27391) deadlock in ContinuousExecution unit tests
[ https://issues.apache.org/jira/browse/SPARK-27391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres resolved SPARK-27391. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24301 [https://github.com/apache/spark/pull/24301] > deadlock in ContinuousExecution unit tests > -- > > Key: SPARK-27391 > URL: https://issues.apache.org/jira/browse/SPARK-27391 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > Fix For: 3.0.0 > > > ContinuousExecution (in the final query execution phrase) holds the lazy val > lock of its IncrementalExecution for the entire duration of the (indefinite > length) job. This can cause deadlocks in unit tests, which hook into internal > APIs and try to instantiate other lazy vals. > > (Note that this should not be able to affect production.) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27391) deadlock in ContinuousExecution unit tests
Jose Torres created SPARK-27391: --- Summary: deadlock in ContinuousExecution unit tests Key: SPARK-27391 URL: https://issues.apache.org/jira/browse/SPARK-27391 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres ContinuousExecution (in the final query execution phrase) holds the lazy val lock of its IncrementalExecution for the entire duration of the (indefinite length) job. This can cause deadlocks in unit tests, which hook into internal APIs and try to instantiate other lazy vals. (Note that this should not be able to affect production.) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27253) SparkSession clone discards SQLConf overrides in favor of SparkConf defaults
Jose Torres created SPARK-27253: --- Summary: SparkSession clone discards SQLConf overrides in favor of SparkConf defaults Key: SPARK-27253 URL: https://issues.apache.org/jira/browse/SPARK-27253 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres SparkSession.cloneSession() is normally supposed to create a child session which inherits all the SQLConf values of its parent session. But when a SQL conf is given a global default through the SparkConf, this does not happen; the child session will receive the SparkConf default rather than its parent's SQLConf override. This is particularly impactful in structured streaming, as the microbatches run in a cloned child session. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24541) TCP based shuffle
[ https://issues.apache.org/jira/browse/SPARK-24541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16758488#comment-16758488 ] Jose Torres commented on SPARK-24541: - I'm not gonna lie, I didn't put a tremendous amount of thought into the title of the Jira ticket. There's a strong argument that using Netty is indeed the right decision here. (Although we have to keep scalability in mind; we'll eventually need to do some kind of multiplexing to support even moderately sized N to N shuffles, so we should probably stay compatible with that.) I'd guess that the RPC framework does carry a performance penalty from things such as extra headers, but I'd argue the major disadvantage is that it's not the right abstraction layer. RPCs normally live exclusively in the control plane. > TCP based shuffle > - > > Key: SPARK-24541 > URL: https://issues.apache.org/jira/browse/SPARK-24541 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26170) Add missing metrics in FlatMapGroupsWithState
[ https://issues.apache.org/jira/browse/SPARK-26170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres resolved SPARK-26170. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23142 [https://github.com/apache/spark/pull/23142] > Add missing metrics in FlatMapGroupsWithState > - > > Key: SPARK-26170 > URL: https://issues.apache.org/jira/browse/SPARK-26170 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Priority: Major > Fix For: 3.0.0 > > > Credit to [~jlaskowski]: I can't change reporter so please try to change it > if you would like to registered as reporter of this issue. > Unlike other operators like StateStoreSaveExec, StreamingDeduplicateExec, and > StreamingGlobalLimitExec, FlatMapGroupsWithStateExec doesn't measure > available metrics in StateStoreWriter, so only part of metrics are measured > in setStoreMetrics. > This issue is to track the effort to measure all possible metrics from > FlatMapGroupsWithStateExec. Some metrics will be kept unavailable. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23098) Migrate Kafka batch source to v2
[ https://issues.apache.org/jira/browse/SPARK-23098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606758#comment-16606758 ] Jose Torres commented on SPARK-23098: - SPARK-23362 migrated the (microbatch) streaming source. This subtask is for migrating the non-streaming source. > Migrate Kafka batch source to v2 > > > Key: SPARK-23098 > URL: https://issues.apache.org/jira/browse/SPARK-23098 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25204) rate source test is flaky
Jose Torres created SPARK-25204: --- Summary: rate source test is flaky Key: SPARK-25204 URL: https://issues.apache.org/jira/browse/SPARK-25204 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.3.1 Reporter: Jose Torres We try to restart a manually clocked rate stream in a test. This is inherently race-prone, because the stream will go backwards in time (and throw an assertion failure) if the clock can't be incremented before it tries to schedule the first batch. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23556) design doc for write side
[ https://issues.apache.org/jira/browse/SPARK-23556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres resolved SPARK-23556. - Resolution: Fixed > design doc for write side > - > > Key: SPARK-23556 > URL: https://issues.apache.org/jira/browse/SPARK-23556 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23557) design doc for read side
[ https://issues.apache.org/jira/browse/SPARK-23557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres resolved SPARK-23557. - Resolution: Done > design doc for read side > > > Key: SPARK-23557 > URL: https://issues.apache.org/jira/browse/SPARK-23557 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23557) design doc for read side
[ https://issues.apache.org/jira/browse/SPARK-23557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525308#comment-16525308 ] Jose Torres commented on SPARK-23557: - https://docs.google.com/document/d/1VzxEuvpLfuHKL6vJO9qJ6ug0x9J_gLoLSH_vJL3-Cho > design doc for read side > > > Key: SPARK-23557 > URL: https://issues.apache.org/jira/browse/SPARK-23557 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23102) Migrate kafka sink
[ https://issues.apache.org/jira/browse/SPARK-23102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525307#comment-16525307 ] Jose Torres commented on SPARK-23102: - Yeah, KafkaStreamWriter already took care of this issue. I think we ended up with duplicate Jira tickets for the Kafka migration. > Migrate kafka sink > -- > > Key: SPARK-23102 > URL: https://issues.apache.org/jira/browse/SPARK-23102 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23102) Migrate kafka sink
[ https://issues.apache.org/jira/browse/SPARK-23102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres resolved SPARK-23102. - Resolution: Duplicate > Migrate kafka sink > -- > > Key: SPARK-23102 > URL: https://issues.apache.org/jira/browse/SPARK-23102 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23014) Migrate MemorySink fully to v2
[ https://issues.apache.org/jira/browse/SPARK-23014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525305#comment-16525305 ] Jose Torres commented on SPARK-23014: - I'm not currently. I ran into a problem trying to make MemorySinkV2 display proper errors in PySpark, and I haven't been able to find time to address it. I've closed the PR so someone else can pick this up if they'd like - otherwise I'll get back to it eventually. > Migrate MemorySink fully to v2 > -- > > Key: SPARK-23014 > URL: https://issues.apache.org/jira/browse/SPARK-23014 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > There's already a MemorySinkV2, but its use is controlled by a flag. We need > to remove the V1 sink and always use it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24541) TCP based shuffle
Jose Torres created SPARK-24541: --- Summary: TCP based shuffle Key: SPARK-24541 URL: https://issues.apache.org/jira/browse/SPARK-24541 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24460) exactly-once mode
Jose Torres created SPARK-24460: --- Summary: exactly-once mode Key: SPARK-24460 URL: https://issues.apache.org/jira/browse/SPARK-24460 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Major changes we know will need to be made: * Restart strategy needs to replay offsets already in the log as microbatches. * Some kind of epoch alignment - need to think about this further. We'll also need a test plan to ensure that we've actually achieved exactly once. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24459) watermarks
Jose Torres created SPARK-24459: --- Summary: watermarks Key: SPARK-24459 URL: https://issues.apache.org/jira/browse/SPARK-24459 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24374) SPIP: Support Barrier Scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498772#comment-16498772 ] Jose Torres commented on SPARK-24374: - Yeah, continuous processing (SPARK-20928) is another good use case for barrier scheduling. In addition to ensuring all tasks are actually scheduled, it or something very much like it is necessary to implement the continuous processing shuffle operator (https://issues.apache.org/jira/browse/SPARK-24036) - every writer needs to know where its corresponding readers are located, and we can't split the work into multiple jobs while maintaining the latency characteristics we're targeting. I can understand the concern about trying to make Spark's job scheduler work as a cluster manager, but I don't think this SPIP risks starting down that road. Conceptually, we're adding a compact and targeted feature: job-level synchronization without the costs of re-scheduling everything. > SPIP: Support Barrier Scheduling in Apache Spark > > > Key: SPARK-24374 > URL: https://issues.apache.org/jira/browse/SPARK-24374 > Project: Spark > Issue Type: Epic > Components: ML, Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Major > Labels: SPIP > Attachments: SPIP_ Support Barrier Scheduling in Apache Spark.pdf > > > (See details in the linked/attached SPIP doc.) > {quote} > The proposal here is to add a new scheduling model to Apache Spark so users > can properly embed distributed DL training as a Spark stage to simplify the > distributed training workflow. For example, Horovod uses MPI to implement > all-reduce to accelerate distributed TensorFlow training. The computation > model is different from MapReduce used by Spark. In Spark, a task in a stage > doesn’t depend on any other tasks in the same stage, and hence it can be > scheduled independently. In MPI, all workers start at the same time and pass > messages around. To embed this workload in Spark, we need to introduce a new > scheduling model, tentatively named “barrier scheduling”, which launches > tasks at the same time and provides users enough information and tooling to > embed distributed DL training. Spark can also provide an extra layer of fault > tolerance in case some tasks failed in the middle, where Spark would abort > all tasks and restart the stage. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24386) implement continuous processing coalesce(1)
Jose Torres created SPARK-24386: --- Summary: implement continuous processing coalesce(1) Key: SPARK-24386 URL: https://issues.apache.org/jira/browse/SPARK-24386 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres [~marmbrus] suggested this as a good implementation checkpoint. If we do the shuffle reader and writer correctly, it should be easy to make a custom coalesce(1) execution for continuous processing using them, without having to implement the logic for shuffle writers finding out where shuffle readers are located. (The coalesce(1) can just get the RpcEndpointRef directly from the reader and pass it to the writers.) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489648#comment-16489648 ] Jose Torres edited comment on SPARK-24036 at 5/24/18 8:23 PM: -- I've been notified of [https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24374?filter=allopenissues,] a SPIP for an API which would provide much of what we need here wrt letting tasks know where the appropriate shuffle endpoints. was (Author: joseph.torres): I've been notified of [https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24374|https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24374,], a SPIP for an API which would provide much of what we need here wrt letting tasks know where the appropriate shuffle endpoints. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489648#comment-16489648 ] Jose Torres edited comment on SPARK-24036 at 5/24/18 8:22 PM: -- I've been notified of [https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24374|https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24374,], a SPIP for an API which would provide much of what we need here wrt letting tasks know where the appropriate shuffle endpoints. was (Author: joseph.torres): I've been notified of [https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24374,] a SPIP for an API which would provide much of what we need here wrt letting tasks know where the appropriate shuffle endpoints. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489727#comment-16489727 ] Jose Torres commented on SPARK-24036: - That's out of scope - the shuffle reader and writer work in this Jira would still be needed on top. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23416) Flaky test: KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false
[ https://issues.apache.org/jira/browse/SPARK-23416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489661#comment-16489661 ] Jose Torres commented on SPARK-23416: - Do you know how to drive that? I'm not sure what the process is. > Flaky test: KafkaSourceStressForDontFailOnDataLossSuite.stress test for > failOnDataLoss=false > > > Key: SPARK-23416 > URL: https://issues.apache.org/jira/browse/SPARK-23416 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Assignee: Jose Torres >Priority: Minor > Fix For: 2.4.0 > > > I suspect this is a race condition latent in the DataSourceV2 write path, or > at least the interaction of that write path with StreamTest. > [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport/org.apache.spark.sql.kafka010/KafkaSourceStressForDontFailOnDataLossSuite/stress_test_for_failOnDataLoss_false/] > h3. Error Message > org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 16b2a2b1-acdd-44ec-902f-531169193169, runId = > 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job > aborted. > h3. Stacktrace > sbt.ForkMain$ForkError: > org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 16b2a2b1-acdd-44ec-902f-531169193169, runId = > 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job > aborted. at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Writing > job aborted. at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:108) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$15.apply(MicroBatchExecution.scala:488) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:483) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:482) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at >
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489648#comment-16489648 ] Jose Torres commented on SPARK-24036: - I've been notified of [https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24374,] a SPIP for an API which would provide much of what we need here wrt letting tasks know where the appropriate shuffle endpoints. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23416) Flaky test: KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false
[ https://issues.apache.org/jira/browse/SPARK-23416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481740#comment-16481740 ] Jose Torres commented on SPARK-23416: - No problem. I've been working on this since last week. > Flaky test: KafkaSourceStressForDontFailOnDataLossSuite.stress test for > failOnDataLoss=false > > > Key: SPARK-23416 > URL: https://issues.apache.org/jira/browse/SPARK-23416 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Minor > Fix For: 2.3.0 > > > I suspect this is a race condition latent in the DataSourceV2 write path, or > at least the interaction of that write path with StreamTest. > [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport/org.apache.spark.sql.kafka010/KafkaSourceStressForDontFailOnDataLossSuite/stress_test_for_failOnDataLoss_false/] > h3. Error Message > org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 16b2a2b1-acdd-44ec-902f-531169193169, runId = > 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job > aborted. > h3. Stacktrace > sbt.ForkMain$ForkError: > org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 16b2a2b1-acdd-44ec-902f-531169193169, runId = > 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job > aborted. at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Writing > job aborted. at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:108) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$15.apply(MicroBatchExecution.scala:488) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:483) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:482) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) >
[jira] [Commented] (SPARK-23504) Flaky test: RateSourceV2Suite.basic microbatch execution
[ https://issues.apache.org/jira/browse/SPARK-23504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481739#comment-16481739 ] Jose Torres commented on SPARK-23504: - The test run posted appears to be very old - the most recent pull request builder is 90838. Unless there's a more recent example, I think I'd stand by my claim that PR 20688 solved the problem. > Flaky test: RateSourceV2Suite.basic microbatch execution > > > Key: SPARK-23504 > URL: https://issues.apache.org/jira/browse/SPARK-23504 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Priority: Major > > Seen on an unrelated change: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport/org.apache.spark.sql.execution.streaming/RateSourceV2Suite/basic_microbatch_execution/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: == Results == !== Correct > Answer - 10 == == Spark Answer - 0 == !struct<_1:timestamp,_2:int> > struct<> ![1969-12-31 16:00:00.0,0] ![1969-12-31 16:00:00.1,1] > ![1969-12-31 16:00:00.2,2] ![1969-12-31 16:00:00.3,3] ![1969-12-31 > 16:00:00.4,4] ![1969-12-31 16:00:00.5,5] ![1969-12-31 16:00:00.6,6] > ![1969-12-31 16:00:00.7,7] ![1969-12-31 16:00:00.8,8] > ![1969-12-31 16:00:00.9,9]== Progress == > AdvanceRateManualClock(1) => CheckLastBatch: [1969-12-31 > 16:00:00.0,0],[1969-12-31 16:00:00.1,1],[1969-12-31 16:00:00.2,2],[1969-12-31 > 16:00:00.3,3],[1969-12-31 16:00:00.4,4],[1969-12-31 16:00:00.5,5],[1969-12-31 > 16:00:00.6,6],[1969-12-31 16:00:00.7,7],[1969-12-31 16:00:00.8,8],[1969-12-31 > 16:00:00.9,9]StopStream > StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@22bc97a,Map(),null) > AdvanceRateManualClock(2)CheckLastBatch: [1969-12-31 > 16:00:01.0,10],[1969-12-31 16:00:01.1,11],[1969-12-31 > 16:00:01.2,12],[1969-12-31 16:00:01.3,13],[1969-12-31 > 16:00:01.4,14],[1969-12-31 16:00:01.5,15],[1969-12-31 > 16:00:01.6,16],[1969-12-31 16:00:01.7,17],[1969-12-31 > 16:00:01.8,18],[1969-12-31 16:00:01.9,19] == Stream == Output Mode: Append > Stream state: > {org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader@75b88292: > {"0":{"value":-1,"runTimeMs":0}}} Thread state: alive Thread stack trace: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633) > org.apache.spark.SparkContext.runJob(SparkContext.scala:2030) > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:84) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) >
[jira] [Commented] (SPARK-24234) create the bottom-of-task RDD with row buffer
[ https://issues.apache.org/jira/browse/SPARK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474993#comment-16474993 ] Jose Torres commented on SPARK-24234: - I'll work on this. > create the bottom-of-task RDD with row buffer > - > > Key: SPARK-24234 > URL: https://issues.apache.org/jira/browse/SPARK-24234 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] > > This probably ought to be an abstraction of ContinuousDataSourceRDD and > ContinuousQueuedDataReader. These classes do pretty much exactly what's > needed, except the buffer is filled from the remote data source instead of a > remote Spark task. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470552#comment-16470552 ] Jose Torres commented on SPARK-24036: - My concern isn't that we'll have to write more code, but that changing scheduler internals expands the surface area of interactions that need to be considered. For example, can we confidently enumerate all the ways in which the scheduler assumes a Dependency defines a stage boundary? If so, can we change all of them in a way that doesn't impact non-continuous-processing code at all? We'd have to consider a lot of questions like that, and I don't see any large benefit we'd get from doing so. Glad to take a look at your preview PR. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469863#comment-16469863 ] Jose Torres commented on SPARK-24036: - The way I was envisioning it, there would be four kinds of tasks when we're done: * reader-only, which has a ContinuousDataReader at the bottom and one of the new queue writers at the top * intermediate, which has one of the new queue readers at the bottom and one of the new queue writers at the top * writer-only, which has one of the new queue readers at the bottom and a DataWriter (to the remote data sink) at the top * reader-writer, which has a ContinuousDataReader at the bottom and a DataWriter at the top But each of these would be implemented as partitions of the ContinuousWriteRDD, allowing all of this to be opaque to the scheduler. Changing DAGScheduler to accommodate continuous processing would create significant additional complexity I don't think we can really justify. Whether we need to write an explicit shuffle RDD class or not would I think come down to an implementation detail of SPARK-24236. It depends on what's the cleanest way to unfold the SparkPlan tree. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469801#comment-16469801 ] Jose Torres commented on SPARK-24036: - ~[~XuanYuan] Since it seems we've reached broad consensus on the doc, I've added the relevant subtasks here. The stateful operator rewind is part of the "support single partition aggregates" PR I have out. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24041) add flag to remove whitelist of continuous processing operators
[ https://issues.apache.org/jira/browse/SPARK-24041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres resolved SPARK-24041. - Resolution: Not A Problem > add flag to remove whitelist of continuous processing operators > --- > > Key: SPARK-24041 > URL: https://issues.apache.org/jira/browse/SPARK-24041 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > Initially this will just be for unit testing of developing support, but in > the long term continuous processing should probably support most query nodes. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24041) add flag to remove whitelist of continuous processing operators
[ https://issues.apache.org/jira/browse/SPARK-24041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469800#comment-16469800 ] Jose Torres commented on SPARK-24041: - This isn't needed, we can just disable UnsupportedOperationChecker in the unit tests requiring it. > add flag to remove whitelist of continuous processing operators > --- > > Key: SPARK-24041 > URL: https://issues.apache.org/jira/browse/SPARK-24041 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > Initially this will just be for unit testing of developing support, but in > the long term continuous processing should probably support most query nodes. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24237) continuous shuffle dependency
Jose Torres created SPARK-24237: --- Summary: continuous shuffle dependency Key: SPARK-24237 URL: https://issues.apache.org/jira/browse/SPARK-24237 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] We might not need this to be an actual org.apache.spark.Dependency. We need to somehow register with MapOutputTracker, or write our own custom tracker if this ends up being infeasible. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24236) continuous replacement for ShuffleExchangeExec
Jose Torres created SPARK-24236: --- Summary: continuous replacement for ShuffleExchangeExec Key: SPARK-24236 URL: https://issues.apache.org/jira/browse/SPARK-24236 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Two major differences from ShuffleExchangeExec: * The post-shuffle RDD should contain all the tasks, rather than reporting scheduler-level dependencies. * The operator will have to manually register the continuous shuffle dependencies, since they're not reported to the scheduler. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer
Jose Torres created SPARK-24235: --- Summary: create the top-of-task RDD sending rows to the remote buffer Key: SPARK-24235 URL: https://issues.apache.org/jira/browse/SPARK-24235 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] Note that after [https://github.com/apache/spark/pull/21239,] this will need to be responsible for incrementing its task's EpochTracker. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24234) create the bottom-of-task RDD with row buffer
[ https://issues.apache.org/jira/browse/SPARK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres updated SPARK-24234: Summary: create the bottom-of-task RDD with row buffer (was: Write RDD with row buffer) > create the bottom-of-task RDD with row buffer > - > > Key: SPARK-24234 > URL: https://issues.apache.org/jira/browse/SPARK-24234 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] > > This probably ought to be an abstraction of ContinuousDataSourceRDD and > ContinuousQueuedDataReader. These classes do pretty much exactly what's > needed, except the buffer is filled from the remote data source instead of a > remote Spark task. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24234) Write RDD with row buffer
Jose Torres created SPARK-24234: --- Summary: Write RDD with row buffer Key: SPARK-24234 URL: https://issues.apache.org/jira/browse/SPARK-24234 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] This probably ought to be an abstraction of ContinuousDataSourceRDD and ContinuousQueuedDataReader. These classes do pretty much exactly what's needed, except the buffer is filled from the remote data source instead of a remote Spark task. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23703) Collapse sequential watermarks
[ https://issues.apache.org/jira/browse/SPARK-23703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464853#comment-16464853 ] Jose Torres commented on SPARK-23703: - Up to you. It might be worth asking if there are use cases for that kind of thing, but on the other hand I don't know of other systems that support it. > Collapse sequential watermarks > --- > > Key: SPARK-23703 > URL: https://issues.apache.org/jira/browse/SPARK-23703 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > When there are two sequential EventTimeWatermark nodes in a query plan, the > topmost one overrides the column tracking metadata from its children, but > leaves the nodes themselves untouched. When there is no intervening stateful > operation to consume the watermark, we should remove the lower node entirely. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24183) add unit tests for ContinuousDataReader hook
Jose Torres created SPARK-24183: --- Summary: add unit tests for ContinuousDataReader hook Key: SPARK-24183 URL: https://issues.apache.org/jira/browse/SPARK-24183 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Currently this is the class named ContinuousQueuedDataReader, but I don't know if this will change as we deal with stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23703) Collapse sequential watermarks
[ https://issues.apache.org/jira/browse/SPARK-23703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463190#comment-16463190 ] Jose Torres commented on SPARK-23703: - No, I don't know of any actual use cases for this. I think just writing an analyzer rule disallowing it could be a valid resolution here. > Collapse sequential watermarks > --- > > Key: SPARK-23703 > URL: https://issues.apache.org/jira/browse/SPARK-23703 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > When there are two sequential EventTimeWatermark nodes in a query plan, the > topmost one overrides the column tracking metadata from its children, but > leaves the nodes themselves untouched. When there is no intervening stateful > operation to consume the watermark, we should remove the lower node entirely. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23703) Collapse sequential watermarks
[ https://issues.apache.org/jira/browse/SPARK-23703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462584#comment-16462584 ] Jose Torres commented on SPARK-23703: - I'm no longer entirely convinced that this (and the parent JIRA) are correct. We might not want to support these scenarios at all. The question here is what we should do with the query: df.withWatermark(“a”, …) .withWatermark(“b”, …) .agg(...) What we do right now is definitely wrong. We (in MicroBatchExecution) calculate separate watermarks on "a" and "b", take their minimum, and then pass that as the watermark value to the aggregate. But the aggregate only sees "b" as a watermarked column, because only "b" has EventTimeWatermark.delayKey set in its attribute metadata at the aggregate node. EventTimeWatermark("b").output erases the metadata for "a" in its output. So we need to somehow resolve this mismatch. > Collapse sequential watermarks > --- > > Key: SPARK-23703 > URL: https://issues.apache.org/jira/browse/SPARK-23703 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > When there are two sequential EventTimeWatermark nodes in a query plan, the > topmost one overrides the column tracking metadata from its children, but > leaves the nodes themselves untouched. When there is no intervening stateful > operation to consume the watermark, we should remove the lower node entirely. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454788#comment-16454788 ] Jose Torres commented on SPARK-24036: - https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE I wrote a quick doc summarizing my thoughts. TLDR is: * I think it's better to not reuse the existing shuffle infrastructure - we'll have to do more work to get good performance later, but current shuffle has very bad characteristics for what continuous processing is trying to do. In particular I doubt we'd be able to maintain millisecond-scale latency with anything like UnsafeShuffleWriter. * It's a small diff on top of a working shuffle to support exactly-once state management. I don't think the coordinator needs to worry about stateful operators; a writer will never commit if a stateful operator below it fails to checkpoint, and the stateful operator itself can rewind if it commits an epoch that ends up failing. Let me know what you two think. I'll send this out to the dev list if it looks reasonable, and then we can start thinking about how this breaks down into individual tasks. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16452576#comment-16452576 ] Jose Torres commented on SPARK-24036: - The broader Spark community is of course always welcome to help. The work here is generally split into three components: * Supporting single partition aggregates. I have a substantially complete prototype of this in [https://github.com/jose-torres/spark/pull/13] - it doesn't really involve design as much as removing a very silly hack I put in earlier. * Extending support to make continuous queries with multiple partitions run. My experimentation suggests that this only requires making ShuffleExchangeExec not cache its RDD in continuous mode, but I haven't strongly verified this. * Making the multiple partition aggregates truly continuous. ShuffleExchangeExec will of course insert a stage boundary, which means that latency will end up being bound by the checkpoint interval. What we need to do is create a new kind of shuffle for continuous processing which is non-blocking (cc [~liweisheng]). There are two possibilities here which I haven't evaluated in detail: ** Reuse the existing shuffle infrastructure, optimizing for latency later if needed. ** Just write RPC endpoints on both ends tossing rows around, optimizing for throughput later if needed. (I'm leaning towards this one.) If you're interested in working on some of this, I can prioritize a design for that third part. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24041) add flag to remove whitelist of continuous processing operators
Jose Torres created SPARK-24041: --- Summary: add flag to remove whitelist of continuous processing operators Key: SPARK-24041 URL: https://issues.apache.org/jira/browse/SPARK-24041 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Initially this will just be for unit testing of developing support, but in the long term continuous processing should probably support most query nodes. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24039) remove restarting iterators hack
Jose Torres created SPARK-24039: --- Summary: remove restarting iterators hack Key: SPARK-24039 URL: https://issues.apache.org/jira/browse/SPARK-24039 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Currently, continuous processing execution calls next() to restart the query iterator after it returns false. This doesn't work for complex RDDs - we need to call compute() instead. This isn't refactoring-only; changes will be required to keep the reader from starting over in each compute() call. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24040) support single partition aggregates
Jose Torres created SPARK-24040: --- Summary: support single partition aggregates Key: SPARK-24040 URL: https://issues.apache.org/jira/browse/SPARK-24040 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Single partition aggregates are a useful milestone because they don't involve a shuffle. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24038) refactor continuous write exec to its own class
Jose Torres created SPARK-24038: --- Summary: refactor continuous write exec to its own class Key: SPARK-24038 URL: https://issues.apache.org/jira/browse/SPARK-24038 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24037) stateful operators
Jose Torres created SPARK-24037: --- Summary: stateful operators Key: SPARK-24037 URL: https://issues.apache.org/jira/browse/SPARK-24037 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres pointer to https://issues.apache.org/jira/browse/SPARK-24036 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24036) Stateful operators in continuous processing
Jose Torres created SPARK-24036: --- Summary: Stateful operators in continuous processing Key: SPARK-24036 URL: https://issues.apache.org/jira/browse/SPARK-24036 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres The first iteration of continuous processing in Spark 2.3 does not work with stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23887) update query progress
Jose Torres created SPARK-23887: --- Summary: update query progress Key: SPARK-23887 URL: https://issues.apache.org/jira/browse/SPARK-23887 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23886) update query.status
Jose Torres created SPARK-23886: --- Summary: update query.status Key: SPARK-23886 URL: https://issues.apache.org/jira/browse/SPARK-23886 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23504) Flaky test: RateSourceV2Suite.basic microbatch execution
[ https://issues.apache.org/jira/browse/SPARK-23504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423137#comment-16423137 ] Jose Torres commented on SPARK-23504: - We've replaced the hacky RateSourceV2 implementation with a proper one in [https://github.com/apache/spark/pull/20688|https://github.com/apache/spark/pull/20688,]. I'm going to keep track of this for a bit longer, but hopefully it will be gone now. > Flaky test: RateSourceV2Suite.basic microbatch execution > > > Key: SPARK-23504 > URL: https://issues.apache.org/jira/browse/SPARK-23504 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Priority: Major > > Seen on an unrelated change: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport/org.apache.spark.sql.execution.streaming/RateSourceV2Suite/basic_microbatch_execution/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: == Results == !== Correct > Answer - 10 == == Spark Answer - 0 == !struct<_1:timestamp,_2:int> > struct<> ![1969-12-31 16:00:00.0,0] ![1969-12-31 16:00:00.1,1] > ![1969-12-31 16:00:00.2,2] ![1969-12-31 16:00:00.3,3] ![1969-12-31 > 16:00:00.4,4] ![1969-12-31 16:00:00.5,5] ![1969-12-31 16:00:00.6,6] > ![1969-12-31 16:00:00.7,7] ![1969-12-31 16:00:00.8,8] > ![1969-12-31 16:00:00.9,9]== Progress == > AdvanceRateManualClock(1) => CheckLastBatch: [1969-12-31 > 16:00:00.0,0],[1969-12-31 16:00:00.1,1],[1969-12-31 16:00:00.2,2],[1969-12-31 > 16:00:00.3,3],[1969-12-31 16:00:00.4,4],[1969-12-31 16:00:00.5,5],[1969-12-31 > 16:00:00.6,6],[1969-12-31 16:00:00.7,7],[1969-12-31 16:00:00.8,8],[1969-12-31 > 16:00:00.9,9]StopStream > StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@22bc97a,Map(),null) > AdvanceRateManualClock(2)CheckLastBatch: [1969-12-31 > 16:00:01.0,10],[1969-12-31 16:00:01.1,11],[1969-12-31 > 16:00:01.2,12],[1969-12-31 16:00:01.3,13],[1969-12-31 > 16:00:01.4,14],[1969-12-31 16:00:01.5,15],[1969-12-31 > 16:00:01.6,16],[1969-12-31 16:00:01.7,17],[1969-12-31 > 16:00:01.8,18],[1969-12-31 16:00:01.9,19] == Stream == Output Mode: Append > Stream state: > {org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader@75b88292: > {"0":{"value":-1,"runTimeMs":0}}} Thread state: alive Thread stack trace: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633) > org.apache.spark.SparkContext.runJob(SparkContext.scala:2030) > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:84) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) >
[jira] [Comment Edited] (SPARK-23747) Add EpochCoordinator unit tests
[ https://issues.apache.org/jira/browse/SPARK-23747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422613#comment-16422613 ] Jose Torres edited comment on SPARK-23747 at 4/2/18 3:26 PM: - I mean testing the internal logic. We'd want to pull in Mockito and mock out reader, writer, and query, then test various sequences of RPC calls. If you're willing, it'd definitely be nice to do this first so SPARK-23503 can get a test. was (Author: joseph.torres): I mean testing the internal logic. We'd want to pull in Mockito and mock out reader, writer, and query, testing various sequences of RPC calls. If you're willing, it'd definitely be nice to do this first so SPARK-23503 can get a test. > Add EpochCoordinator unit tests > --- > > Key: SPARK-23747 > URL: https://issues.apache.org/jira/browse/SPARK-23747 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23747) Add EpochCoordinator unit tests
[ https://issues.apache.org/jira/browse/SPARK-23747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422613#comment-16422613 ] Jose Torres commented on SPARK-23747: - I mean testing the internal logic. We'd want to pull in Mockito and mock out reader, writer, and query, testing various sequences of RPC calls. If you're willing, it'd definitely be nice to do this first so SPARK-23503 can get a test. > Add EpochCoordinator unit tests > --- > > Key: SPARK-23747 > URL: https://issues.apache.org/jira/browse/SPARK-23747 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23826) TestHiveSparkSession should set default session
Jose Torres created SPARK-23826: --- Summary: TestHiveSparkSession should set default session Key: SPARK-23826 URL: https://issues.apache.org/jira/browse/SPARK-23826 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Jose Torres The fix for TestSparkSession breaks hive/testOnly, because many of the tests both instantiate a TestHiveSparkSession and call SparkSession.getOrCreate(). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23808) Test spark sessions should set default session
Jose Torres created SPARK-23808: --- Summary: Test spark sessions should set default session Key: SPARK-23808 URL: https://issues.apache.org/jira/browse/SPARK-23808 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Jose Torres SparkSession.getOrCreate() ensures that the session it returns is set as a default. Test code (TestSparkSession and TestHiveSparkSession) shortcuts around this method, and thus a default is never set. We need to set it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23788) Race condition in StreamingQuerySuite
Jose Torres created SPARK-23788: --- Summary: Race condition in StreamingQuerySuite Key: SPARK-23788 URL: https://issues.apache.org/jira/browse/SPARK-23788 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres The serializability test uses the same MemoryStream instance for 3 different queries. If any of those queries ask it to commit before the others have run, the rest will see empty dataframes. This can fail the test if q3 is affected. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23748) Support select from temp tables
Jose Torres created SPARK-23748: --- Summary: Support select from temp tables Key: SPARK-23748 URL: https://issues.apache.org/jira/browse/SPARK-23748 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres As reported in the dev list, the following currently fails: val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load(); jdf.createOrReplaceTempView("table") val resultdf = spark.sql("select * from table") resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.Continuous("1 second")).start() -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23747) Add EpochCoordinator unit tests
Jose Torres created SPARK-23747: --- Summary: Add EpochCoordinator unit tests Key: SPARK-23747 URL: https://issues.apache.org/jira/browse/SPARK-23747 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23703) Collapse sequential watermarks
Jose Torres created SPARK-23703: --- Summary: Collapse sequential watermarks Key: SPARK-23703 URL: https://issues.apache.org/jira/browse/SPARK-23703 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres When there are two sequential EventTimeWatermark nodes in a query plan, the topmost one overrides the column tracking metadata from its children, but leaves the nodes themselves untouched. When there is no intervening stateful operation to consume the watermark, we should remove the lower node entirely. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23702) Forbid watermarks on both sides of a streaming aggregate
Jose Torres created SPARK-23702: --- Summary: Forbid watermarks on both sides of a streaming aggregate Key: SPARK-23702 URL: https://issues.apache.org/jira/browse/SPARK-23702 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23701) Multiple sequential watermarks are not supported
Jose Torres created SPARK-23701: --- Summary: Multiple sequential watermarks are not supported Key: SPARK-23701 URL: https://issues.apache.org/jira/browse/SPARK-23701 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres In 2.3, we allowed query plans with multiple watermarks to run to enable stream-stream joins. But we've only implemented the functionality for watermarks in parallel feeding into a join operator. It won't work currently (and would require in-depth changes) if the watermarks are sequential in the plan. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23688) Refactor tests away from rate source
Jose Torres created SPARK-23688: --- Summary: Refactor tests away from rate source Key: SPARK-23688 URL: https://issues.apache.org/jira/browse/SPARK-23688 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Most continuous processing tests currently use a rate source, since that was what was available at the time of implementation. This forces us to do a lot of awkward things to work around the fact that the data in the sink is not perfectly predictable. We should refactor to use a memory stream once it's implemented. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23687) Add MemoryStream
Jose Torres created SPARK-23687: --- Summary: Add MemoryStream Key: SPARK-23687 URL: https://issues.apache.org/jira/browse/SPARK-23687 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres We need a MemoryStream for continuous processing, both in order to write less fragile tests and to eventually use existing stream tests to verify functional equivalence. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23556) design doc for write side
[ https://issues.apache.org/jira/browse/SPARK-23556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393300#comment-16393300 ] Jose Torres commented on SPARK-23556: - Doc: https://docs.google.com/document/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE > design doc for write side > - > > Key: SPARK-23556 > URL: https://issues.apache.org/jira/browse/SPARK-23556 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23325) DataSourceV2 readers should always produce InternalRow.
[ https://issues.apache.org/jira/browse/SPARK-23325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388399#comment-16388399 ] Jose Torres commented on SPARK-23325: - How hard would it be to just declare that InternalRow is stable? The file has been touched about once per year for the past 3 years, and I doubt we'd be able to change it to any significant degree without risking serious regressions. >From my perspective, and I think (but correct me if I'm wrong) the perspective >of the SPIP, a stable interface which can match the performance of Spark's >internal data sources is one of the core goals of DataSourceV2. If >high-performance sources must implement InternalRow reads and writes, then >DataSourceV2 isn't stable until InternalRow is stable anyway. > DataSourceV2 readers should always produce InternalRow. > --- > > Key: SPARK-23325 > URL: https://issues.apache.org/jira/browse/SPARK-23325 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ryan Blue >Priority: Major > > DataSourceV2 row-oriented implementations are limited to producing either > {{Row}} instances or {{UnsafeRow}} instances by implementing > {{SupportsScanUnsafeRow}}. Instead, I think that implementations should > always produce {{InternalRow}}. > The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither > one is appropriate for implementers. > File formats don't produce {{Row}} instances or the data values used by > {{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation > that uses {{Row}} instances must produce data that is immediately translated > from the representation that was just produced by Spark. In my experience, it > made little sense to translate a timestamp in microseconds to a > (milliseconds, nanoseconds) pair, create a {{Timestamp}} instance, and pass > that instance to Spark for immediate translation back. > On the other hand, {{UnsafeRow}} is very difficult to produce unless data is > already held in memory. Even the Parquet support built into Spark > deserializes to {{InternalRow}} and then uses {{UnsafeProjection}} to produce > unsafe rows. When I went to build an implementation that deserializes Parquet > or Avro directly to {{UnsafeRow}} (I tried both), I found that it couldn't be > done without first deserializing into memory because the size of an array > must be known before any values are written. > I ended up deciding to deserialize to {{InternalRow}} and use > {{UnsafeProjection}} to convert to unsafe. There are two problems with this: > first, this is Scala and was difficult to call from Java (it required > reflection), and second, this causes double projection in the physical plan > (a copy for unsafe to unsafe) if there is a projection that wasn't fully > pushed to the data source. > I think the solution is to have a single interface for readers that expects > {{InternalRow}}. Then, a projection should be added in the Spark plan to > convert to unsafe and avoid projection in the plan and in the data source. If > the data source already produces unsafe rows by deserializing directly, this > still minimizes the number of copies because the unsafe projection will check > whether the incoming data is already {{UnsafeRow}}. > Using {{InternalRow}} would also match the interface on the write side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23574) SinglePartition in data source V2 scan
[ https://issues.apache.org/jira/browse/SPARK-23574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres updated SPARK-23574: Component/s: (was: Structured Streaming) Spark Core > SinglePartition in data source V2 scan > -- > > Key: SPARK-23574 > URL: https://issues.apache.org/jira/browse/SPARK-23574 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > DataSourceV2ScanExec currently reports UnknownPartitioning whenever the > reader doesn't mix in SupportsReportPartitioning. It can also report > SinglePartition in the case where there's a single reader factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23574) SinglePartition in data source V2 scan
Jose Torres created SPARK-23574: --- Summary: SinglePartition in data source V2 scan Key: SPARK-23574 URL: https://issues.apache.org/jira/browse/SPARK-23574 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres DataSourceV2ScanExec currently reports UnknownPartitioning whenever the reader doesn't mix in SupportsReportPartitioning. It can also report SinglePartition in the case where there's a single reader factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23561) make StreamWriter not a DataSourceWriter subclass
Jose Torres created SPARK-23561: --- Summary: make StreamWriter not a DataSourceWriter subclass Key: SPARK-23561 URL: https://issues.apache.org/jira/browse/SPARK-23561 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres The inheritance makes little sense now; they've almost entirely diverged. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23559) add epoch ID to data writer factory
Jose Torres created SPARK-23559: --- Summary: add epoch ID to data writer factory Key: SPARK-23559 URL: https://issues.apache.org/jira/browse/SPARK-23559 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres To support the StreamWriter lifecycle described in SPARK-22910, epoch ID has to be specifiable at DataWriter creation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23558) clean up StreamWriter factory lifecycle
Jose Torres created SPARK-23558: --- Summary: clean up StreamWriter factory lifecycle Key: SPARK-23558 URL: https://issues.apache.org/jira/browse/SPARK-23558 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Right now, StreamWriter and children have different lifecycles in continuous processing and microbatch mode. Both execution modes impose significant constraints on what that lifecycle must be, so the achievable consistent semantic is: * StreamWriter lasts for the duration of the query execution * DataWriterFactory lasts for the duration of the query execution * DataWriter (the task-level writer) has a lifecycle tied to each individual epoch This also allows us to restore the implicit semantic that DataWriter.commit()/abort() terminates the lifecycle. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23557) design doc for read side
Jose Torres created SPARK-23557: --- Summary: design doc for read side Key: SPARK-23557 URL: https://issues.apache.org/jira/browse/SPARK-23557 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23556) design doc for write side
Jose Torres created SPARK-23556: --- Summary: design doc for write side Key: SPARK-23556 URL: https://issues.apache.org/jira/browse/SPARK-23556 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23504) Flaky test: RateSourceV2Suite.basic microbatch execution
[ https://issues.apache.org/jira/browse/SPARK-23504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377897#comment-16377897 ] Jose Torres edited comment on SPARK-23504 at 2/27/18 1:56 AM: -- I noticed this happening too, but I've been unable to reproduce it locally after hundreds of runs. was (Author: joseph.torres): I've > Flaky test: RateSourceV2Suite.basic microbatch execution > > > Key: SPARK-23504 > URL: https://issues.apache.org/jira/browse/SPARK-23504 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Priority: Major > > Seen on an unrelated change: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport/org.apache.spark.sql.execution.streaming/RateSourceV2Suite/basic_microbatch_execution/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: == Results == !== Correct > Answer - 10 == == Spark Answer - 0 == !struct<_1:timestamp,_2:int> > struct<> ![1969-12-31 16:00:00.0,0] ![1969-12-31 16:00:00.1,1] > ![1969-12-31 16:00:00.2,2] ![1969-12-31 16:00:00.3,3] ![1969-12-31 > 16:00:00.4,4] ![1969-12-31 16:00:00.5,5] ![1969-12-31 16:00:00.6,6] > ![1969-12-31 16:00:00.7,7] ![1969-12-31 16:00:00.8,8] > ![1969-12-31 16:00:00.9,9]== Progress == > AdvanceRateManualClock(1) => CheckLastBatch: [1969-12-31 > 16:00:00.0,0],[1969-12-31 16:00:00.1,1],[1969-12-31 16:00:00.2,2],[1969-12-31 > 16:00:00.3,3],[1969-12-31 16:00:00.4,4],[1969-12-31 16:00:00.5,5],[1969-12-31 > 16:00:00.6,6],[1969-12-31 16:00:00.7,7],[1969-12-31 16:00:00.8,8],[1969-12-31 > 16:00:00.9,9]StopStream > StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@22bc97a,Map(),null) > AdvanceRateManualClock(2)CheckLastBatch: [1969-12-31 > 16:00:01.0,10],[1969-12-31 16:00:01.1,11],[1969-12-31 > 16:00:01.2,12],[1969-12-31 16:00:01.3,13],[1969-12-31 > 16:00:01.4,14],[1969-12-31 16:00:01.5,15],[1969-12-31 > 16:00:01.6,16],[1969-12-31 16:00:01.7,17],[1969-12-31 > 16:00:01.8,18],[1969-12-31 16:00:01.9,19] == Stream == Output Mode: Append > Stream state: > {org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader@75b88292: > {"0":{"value":-1,"runTimeMs":0}}} Thread state: alive Thread stack trace: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633) > org.apache.spark.SparkContext.runJob(SparkContext.scala:2030) > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:84) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) >
[jira] [Commented] (SPARK-23504) Flaky test: RateSourceV2Suite.basic microbatch execution
[ https://issues.apache.org/jira/browse/SPARK-23504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377897#comment-16377897 ] Jose Torres commented on SPARK-23504: - I've > Flaky test: RateSourceV2Suite.basic microbatch execution > > > Key: SPARK-23504 > URL: https://issues.apache.org/jira/browse/SPARK-23504 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Priority: Major > > Seen on an unrelated change: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport/org.apache.spark.sql.execution.streaming/RateSourceV2Suite/basic_microbatch_execution/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: == Results == !== Correct > Answer - 10 == == Spark Answer - 0 == !struct<_1:timestamp,_2:int> > struct<> ![1969-12-31 16:00:00.0,0] ![1969-12-31 16:00:00.1,1] > ![1969-12-31 16:00:00.2,2] ![1969-12-31 16:00:00.3,3] ![1969-12-31 > 16:00:00.4,4] ![1969-12-31 16:00:00.5,5] ![1969-12-31 16:00:00.6,6] > ![1969-12-31 16:00:00.7,7] ![1969-12-31 16:00:00.8,8] > ![1969-12-31 16:00:00.9,9]== Progress == > AdvanceRateManualClock(1) => CheckLastBatch: [1969-12-31 > 16:00:00.0,0],[1969-12-31 16:00:00.1,1],[1969-12-31 16:00:00.2,2],[1969-12-31 > 16:00:00.3,3],[1969-12-31 16:00:00.4,4],[1969-12-31 16:00:00.5,5],[1969-12-31 > 16:00:00.6,6],[1969-12-31 16:00:00.7,7],[1969-12-31 16:00:00.8,8],[1969-12-31 > 16:00:00.9,9]StopStream > StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@22bc97a,Map(),null) > AdvanceRateManualClock(2)CheckLastBatch: [1969-12-31 > 16:00:01.0,10],[1969-12-31 16:00:01.1,11],[1969-12-31 > 16:00:01.2,12],[1969-12-31 16:00:01.3,13],[1969-12-31 > 16:00:01.4,14],[1969-12-31 16:00:01.5,15],[1969-12-31 > 16:00:01.6,16],[1969-12-31 16:00:01.7,17],[1969-12-31 > 16:00:01.8,18],[1969-12-31 16:00:01.9,19] == Stream == Output Mode: Append > Stream state: > {org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader@75b88292: > {"0":{"value":-1,"runTimeMs":0}}} Thread state: alive Thread stack trace: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633) > org.apache.spark.SparkContext.runJob(SparkContext.scala:2030) > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:84) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$18.apply(MicroBatchExecution.scala:493) > > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > >
[jira] [Created] (SPARK-23503) continuous execution should sequence committed epochs
Jose Torres created SPARK-23503: --- Summary: continuous execution should sequence committed epochs Key: SPARK-23503 URL: https://issues.apache.org/jira/browse/SPARK-23503 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres Currently, the EpochCoordinator doesn't enforce a commit order. If a message for epoch n gets lost in the ether, and epoch n + 1 happens to be ready for commit earlier, epoch n + 1 will be committed. This is either incorrect or needlessly confusing, because it's not safe to start from the end offset of epoch n + 1 until epoch n is committed. EpochCoordinator should enforce this sequencing. Note that this is not actually a problem right now, because the commit messages go through the same RPC channel from the same place. But we shouldn't implicitly bake this assumption in. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23491) continuous symptom
Jose Torres created SPARK-23491: --- Summary: continuous symptom Key: SPARK-23491 URL: https://issues.apache.org/jira/browse/SPARK-23491 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23369) HiveClientSuites fails with unresolved dependency
[ https://issues.apache.org/jira/browse/SPARK-23369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370832#comment-16370832 ] Jose Torres commented on SPARK-23369: - Still seeing this in e.g. https://github.com/apache/spark/pull/20646 > HiveClientSuites fails with unresolved dependency > - > > Key: SPARK-23369 > URL: https://issues.apache.org/jira/browse/SPARK-23369 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenchen Fan >Priority: Major > > I saw it multiple times in PR builders. The error message is: > > {code:java} > sbt.ForkMain$ForkError: java.lang.RuntimeException: [unresolved dependency: > com.sun.jersey#jersey-json;1.14: configuration not found in > com.sun.jersey#jersey-json;1.14: 'master(compile)'. Missing configuration: > 'compile'. It was required from org.apache.hadoop#hadoop-yarn-common;2.6.5 > compile] at > org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1270) > at > org.apache.spark.sql.hive.client.IsolatedClientLoader$$anonfun$2.apply(IsolatedClientLoader.scala:113) > at > org.apache.spark.sql.hive.client.IsolatedClientLoader$$anonfun$2.apply(IsolatedClientLoader.scala:113) > at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42) at > org.apache.spark.sql.hive.client.IsolatedClientLoader$.downloadVersion(IsolatedClientLoader.scala:112) > at > org.apache.spark.sql.hive.client.IsolatedClientLoader$.liftedTree1$1(IsolatedClientLoader.scala:74) > at > org.apache.spark.sql.hive.client.IsolatedClientLoader$.forVersion(IsolatedClientLoader.scala:62) > at > org.apache.spark.sql.hive.client.HiveClientBuilder$.buildClient(HiveClientBuilder.scala:51) > at > org.apache.spark.sql.hive.client.HiveVersionSuite.buildClient(HiveVersionSuite.scala:41) > at > org.apache.spark.sql.hive.client.HiveClientSuite.org$apache$spark$sql$hive$client$HiveClientSuite$$init(HiveClientSuite.scala:48) > at > org.apache.spark.sql.hive.client.HiveClientSuite.beforeAll(HiveClientSuite.scala:71) > at > org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:212) > at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210) at > org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52) at > org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1210) at > org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1257) at > org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1255) at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at > org.scalatest.Suite$class.runNestedSuites(Suite.scala:1255) at > org.apache.spark.sql.hive.client.HiveClientSuites.runNestedSuites(HiveClientSuites.scala:24) > at org.scalatest.Suite$class.run(Suite.scala:1144) at > org.apache.spark.sql.hive.client.HiveClientSuites.run(HiveClientSuites.scala:24) > at > org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314) > at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480) > at sbt.ForkMain$Run$2.call(ForkMain.java:296) at > sbt.ForkMain$Run$2.call(ForkMain.java:286) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23444) would like to be able to cancel jobs cleanly
Jose Torres created SPARK-23444: --- Summary: would like to be able to cancel jobs cleanly Key: SPARK-23444 URL: https://issues.apache.org/jira/browse/SPARK-23444 Project: Spark Issue Type: Wish Components: Spark Core Affects Versions: 2.4.0 Reporter: Jose Torres In Structured Streaming, we often need to cancel a Spark job in order to close the stream. SparkContext does not (as far as I can tell) provide a runJob handle which cleanly signals when a job was cancelled; it simply throws a generic SparkException. So we're forced to awkwardly parse this SparkException in order to determine whether the job failed because of a cancellation (which we expect and want to swallow) or another error (which we want to propagate). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23441) Remove interrupts from ContinuousExecution
Jose Torres created SPARK-23441: --- Summary: Remove interrupts from ContinuousExecution Key: SPARK-23441 URL: https://issues.apache.org/jira/browse/SPARK-23441 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres The reason StreamExecution interrupts the query execution thread is that, for the microbatch case, nontrivial work goes on in that thread to construct a batch. In ContinuousExecution, this doesn't apply. Once the state is flipped from ACTIVE and the underlying job is cancelled, the query execution thread will immediately go to cleanup. So we don't need to call queryExecutionThread.interrupt() at all there. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23440) Clean up StreamExecution interrupts
Jose Torres created SPARK-23440: --- Summary: Clean up StreamExecution interrupts Key: SPARK-23440 URL: https://issues.apache.org/jira/browse/SPARK-23440 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres StreamExecution currently heavily leverages interrupt() to stop the query execution thread. But the query execution thread is sometimes in the middle of a context that will wrap or convert the InterruptedException, so we maintain a whitelist of exceptions that we think indicate an exception caused by stop rather than an error condition. This is awkward and probably fragile. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23417) pyspark tests give wrong sbt instructions
Jose Torres created SPARK-23417: --- Summary: pyspark tests give wrong sbt instructions Key: SPARK-23417 URL: https://issues.apache.org/jira/browse/SPARK-23417 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.0 Reporter: Jose Torres When running python/run-tests, the script indicates that I must run "'build/sbt assembly/package streaming-kafka-0-8-assembly/assembly' or 'build/mvn -Pkafka-0-8 package'". The sbt command fails: [error] Expected ID character [error] Not a valid command: streaming-kafka-0-8-assembly [error] Expected project ID [error] Expected configuration [error] Expected ':' (if selecting a configuration) [error] Expected key [error] Not a valid key: streaming-kafka-0-8-assembly [error] streaming-kafka-0-8-assembly/assembly [error] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23416) flaky test: org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false
[ https://issues.apache.org/jira/browse/SPARK-23416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362941#comment-16362941 ] Jose Torres edited comment on SPARK-23416 at 2/13/18 7:48 PM: -- I think I see the problem. * StreamExecution.stop() works by interrupting the stream execution thread. This is not safe in general, and can throw any variety of exceptions. * StreamExecution.isInterruptedByStop() solves this problem by implementing a whitelist of exceptions which indicate the stop() happened. * The v2 write path adds calls to ThreadUtils.awaitResult(), which weren't in the V1 write path and (if the interrupt happens to fall in them) throw a new exception which isn't accounted for. I'm going to write a PR to add another whitelist entry. This whole edifice is a bit fragile, but I don't have a good solution for that. was (Author: joseph.torres): I think I see the problem. * StreamExecution.stop() works by interrupting the stream execution thread. This is not safe in general, and can throw any variety of exceptions. * StreamExecution.isInterruptedByStop() solves this problem by implementing a whitelist of exceptions which indicate the stop() happened. * The v2 write path adds calls to ThreadUtils.awaitResult(), which weren't in the V1 write path and (if the interrupt happens to fall in them) throw a new exception which isn't accounted for. I'm going to write a PR to add another whitelist entry, but this is quite fragile. > flaky test: > org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.stress > test for failOnDataLoss=false > -- > > Key: SPARK-23416 > URL: https://issues.apache.org/jira/browse/SPARK-23416 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Minor > > I suspect this is a race condition latent in the DataSourceV2 write path, or > at least the interaction of that write path with StreamTest. > [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport/org.apache.spark.sql.kafka010/KafkaSourceStressForDontFailOnDataLossSuite/stress_test_for_failOnDataLoss_false/] > h3. Error Message > org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 16b2a2b1-acdd-44ec-902f-531169193169, runId = > 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job > aborted. > h3. Stacktrace > sbt.ForkMain$ForkError: > org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 16b2a2b1-acdd-44ec-902f-531169193169, runId = > 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job > aborted. at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Writing > job aborted. at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:108) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$15.apply(MicroBatchExecution.scala:488) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) >
[jira] [Commented] (SPARK-23416) flaky test: org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false
[ https://issues.apache.org/jira/browse/SPARK-23416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362941#comment-16362941 ] Jose Torres commented on SPARK-23416: - I think I see the problem. * StreamExecution.stop() works by interrupting the stream execution thread. This is not safe in general, and can throw any variety of exceptions. * StreamExecution.isInterruptedByStop() solves this problem by implementing a whitelist of exceptions which indicate the stop() happened. * The v2 write path adds calls to ThreadUtils.awaitResult(), which weren't in the V1 write path and (if the interrupt happens to fall in them) throw a new exception which isn't accounted for. I'm going to write a PR to add another whitelist entry, but this is quite fragile. > flaky test: > org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.stress > test for failOnDataLoss=false > -- > > Key: SPARK-23416 > URL: https://issues.apache.org/jira/browse/SPARK-23416 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Minor > > I suspect this is a race condition latent in the DataSourceV2 write path, or > at least the interaction of that write path with StreamTest. > [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport/org.apache.spark.sql.kafka010/KafkaSourceStressForDontFailOnDataLossSuite/stress_test_for_failOnDataLoss_false/] > h3. Error Message > org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 16b2a2b1-acdd-44ec-902f-531169193169, runId = > 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job > aborted. > h3. Stacktrace > sbt.ForkMain$ForkError: > org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 16b2a2b1-acdd-44ec-902f-531169193169, runId = > 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job > aborted. at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Writing > job aborted. at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:108) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$15.apply(MicroBatchExecution.scala:488) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:483) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:482) > at >
[jira] [Created] (SPARK-23416) flaky test: org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false
Jose Torres created SPARK-23416: --- Summary: flaky test: org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false Key: SPARK-23416 URL: https://issues.apache.org/jira/browse/SPARK-23416 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres I suspect this is a race condition latent in the DataSourceV2 write path, or at least the interaction of that write path with StreamTest. [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport/org.apache.spark.sql.kafka010/KafkaSourceStressForDontFailOnDataLossSuite/stress_test_for_failOnDataLoss_false/] h3. Error Message org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 16b2a2b1-acdd-44ec-902f-531169193169, runId = 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job aborted. h3. Stacktrace sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 16b2a2b1-acdd-44ec-902f-531169193169, runId = 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job aborted. at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:108) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$15.apply(MicroBatchExecution.scala:488) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:483) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:482) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at
[jira] [Commented] (SPARK-23096) Migrate rate source to v2
[ https://issues.apache.org/jira/browse/SPARK-23096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358143#comment-16358143 ] Jose Torres commented on SPARK-23096: - Sure! Happy to have help. The "ratev2" source is just something I hacked together to exercise the v2 streaming execution path. You're right that it can really be replaced with a fully migrated version of the v1 source. > Migrate rate source to v2 > - > > Key: SPARK-23096 > URL: https://issues.apache.org/jira/browse/SPARK-23096 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23221) Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to run with enough cores
[ https://issues.apache.org/jira/browse/SPARK-23221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Torres updated SPARK-23221: Description: Currently, `KafkaContinuousSourceStressForDontFailOnDataLossSuite` runs on only 2 cores. It needs more, because continuous execution requires 1 core per topic partition. > Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to run with enough > cores > -- > > Key: SPARK-23221 > URL: https://issues.apache.org/jira/browse/SPARK-23221 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Jose Torres >Priority: Major > > Currently, `KafkaContinuousSourceStressForDontFailOnDataLossSuite` runs on > only 2 cores. It needs more, because continuous execution requires 1 core per > topic partition. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23221) Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to run with enough cores
Jose Torres created SPARK-23221: --- Summary: Fix KafkaContinuousSourceStressForDontFailOnDataLossSuite to run with enough cores Key: SPARK-23221 URL: https://issues.apache.org/jira/browse/SPARK-23221 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.3.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23196) Unify continuous and microbatch V2 sinks
Jose Torres created SPARK-23196: --- Summary: Unify continuous and microbatch V2 sinks Key: SPARK-23196 URL: https://issues.apache.org/jira/browse/SPARK-23196 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres We currently have separate MicroBatchWriteSupport and ContinuousWriteSupport. But these do fundamentally the same thing, and existing implementations are nearly equivalent but against slightly different interfaces. We should just unify them as StreamWriteSupport. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23097) Migrate text socket source
[ https://issues.apache.org/jira/browse/SPARK-23097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331726#comment-16331726 ] Jose Torres commented on SPARK-23097: - Certainly! There's no specific plan right now. I'm working on a list of pointers for migrating sources; shoot me an email if you want a link to a rough draft. > Migrate text socket source > -- > > Key: SPARK-23097 > URL: https://issues.apache.org/jira/browse/SPARK-23097 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23102) Migrate kafka sink
Jose Torres created SPARK-23102: --- Summary: Migrate kafka sink Key: SPARK-23102 URL: https://issues.apache.org/jira/browse/SPARK-23102 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Jose Torres -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org