[GitHub] spark issue #16043: [SPARK-18601][SQL] Simplify Create/Get complex expressio...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16043 **[Test build #72209 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72209/testReport)** for PR 16043 at commit [`bd27a2b`](https://github.com/apache/spark/commit/bd27a2b029c9b9f4e7743446a5bfab4ab14dfca8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16740: [SPARK-19400][ML] Allow GLM to handle intercept only mod...
Github user actuaryzhang commented on the issue: https://github.com/apache/spark/pull/16740 @sethah Yes, we can directly compute the intercept easily. But I'm concerned that such special handling may not integrate well with other features or future changes. For example, we will need to compute the standard error analytically as well, which is not difficult. But the point is that every time there is new feature, one would have to modify the intercept calculation part to handle it. This does not seem efficient. Thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16760: [SPARK-18872][SQL][TESTS] New test cases for EXIS...
GitHub user dilipbiswal opened a pull request: https://github.com/apache/spark/pull/16760 [SPARK-18872][SQL][TESTS] New test cases for EXISTS subquery (Aggregate, Having, Orderby, Limit) ## What changes were proposed in this pull request? This PR adds the second set of tests for EXISTS subquery. File name| Brief description | - exists-aggregate.sql |Tests aggregate expressions in outer query and EXISTS subquery. exists-having.sql|Tests HAVING clause in subquery. exists-orderby-limit.sql|Tests EXISTS subquery support with ORDER BY and LIMIT clauses. DB2 results are attached here as reference : [exists-aggregate-db2.txt](https://github.com/apache/spark/files/743287/exists-aggregate-db2.txt) [exists-having-db2.txt](https://github.com/apache/spark/files/743286/exists-having-db2.txt) [exists-orderby-limit-db2.txt](https://github.com/apache/spark/files/743288/exists-orderby-limit-db2.txt) (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dilipbiswal/spark exists-pr2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16760.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16760 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16760: [SPARK-18872][SQL][TESTS] New test cases for EXISTS subq...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16760 **[Test build #72208 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72208/testReport)** for PR 16760 at commit [`2473e0c`](https://github.com/apache/spark/commit/2473e0c440a9d1cd761ae6d704d0aa02c63afd83). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16759: [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16759 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16650: [SPARK-16554][CORE] Automatically Kill Executors ...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/16650#discussion_r98782219 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -187,6 +198,19 @@ private[scheduler] class BlacklistTracker ( nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { +allocationClient match { + case Some(allocationClient) => +logInfo(s"Killing all executors on blacklisted host $node " + + s"since spark.blacklist.kill is set.") +if(allocationClient.killExecutorsOnHost(node) == false) { --- End diff -- nit space after if before ( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14725: [SPARK-17161] [PYSPARK][ML] Add PySpark-ML JavaWrapper c...
Github user holdenk commented on the issue: https://github.com/apache/spark/pull/14725 LGTM - thanks for doing this - please ping me on the follow up PRs with the `CountVectorizerModel`. Before merge would you mind updating the PR description for how this was tested to remove the note about the `CountVectorizerModel` since it isn't included in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16759: [SPARK-18871][SQL][TESTS] New test cases for IN/N...
GitHub user kevinyu98 opened a pull request: https://github.com/apache/spark/pull/16759 [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 2nd batch ## What changes were proposed in this pull request? This is 2nd batch of test case for IN/NOT IN subquery. In this PR, it has these test cases: `in-limit.sql` `in-order-by.sql` `not-in-group-by.sql` These are the queries and results from running on DB2. [in-limit DB2 version](https://github.com/apache/spark/files/743267/in-limit.sql.db2.out.txt) [in-order-by DB2 version](https://github.com/apache/spark/files/743269/in-order-by.sql.db2.txt) [not-in-group-by DB2 version](https://github.com/apache/spark/files/743271/not-in-group-by.sql.db2.txt) [output of in-limit.sql DB2](https://github.com/apache/spark/files/743276/in-limit.sql.db2.out.txt) [output of in-order-by.sql DB2](https://github.com/apache/spark/files/743278/in-order-by.sql.db2.out.txt) [output of not-in-group-by.sql DB2](https://github.com/apache/spark/files/743279/not-in-group-by.sql.db2.out.txt) ## How was this patch tested? This pr is adding new test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kevinyu98/spark spark-18871-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16759.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16759 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16650: [SPARK-16554][CORE] Automatically Kill Executors ...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/16650#discussion_r98781265 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -187,6 +198,19 @@ private[scheduler] class BlacklistTracker ( nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { +allocationClient match { + case Some(allocationClient) => +logInfo(s"Killing all executors on blacklisted host $node " + + s"since spark.blacklist.kill is set.") --- End diff -- config name wrong --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16650: [SPARK-16554][CORE] Automatically Kill Executors ...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/16650#discussion_r98781173 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -173,6 +174,16 @@ private[scheduler] class BlacklistTracker ( listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) executorIdToFailureList.remove(exec) updateNextExpiryTime() +if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { +case Some(allocationClient) => + logInfo(s"Killing blacklisted executor id $exec since spark.blacklist.kill is set.") --- End diff -- the config name is wrong here now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778359 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -313,6 +313,25 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + /** + * Strategy to convert MapGroupsWithState logical operator to physical operator + * in streaming plans. Conversion for batch plans is handled by [[BasicOperators]]. + */ + object MapGroupsWithStateStrategy extends Strategy { +override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case MapGroupsWithState( +func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, --- End diff -- I think we indent 4 or 2 past the class normally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98776316 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains --- End diff -- I would consider breaking this up to make it a little easier to follow: ``` For each unique group, the given function will be invoked with the following arguments: - The key of the group. - A user-defined state object set by previous invocations of the given function. Note that, for batch queries, there is only ever one invocation and thus the state object will always be empty. - An iterator containing all the values for this key. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98774221 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the --- End diff -- will be *called* --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779817 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -235,3 +240,86 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning } + + +/** + * Physical operator for executing streaming mapGroupsWithState. + */ +case class MapGroupsWithStateExec( +func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any], +keyDeserializer: Expression, // probably not needed +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +stateId: Option[OperatorStateId], +stateDeserializer: Expression, +stateSerializer: Seq[NamedExpression], +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter { + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed? + + override protected def doExecute(): RDD[InternalRow] = { + +child.execute().mapPartitionsWithStateStore[InternalRow]( + getStateId.checkpointLocation, + operatorId = getStateId.operatorId, + storeVersion = getStateId.batchId, + groupingAttributes.toStructType, + child.output.toStructType, + sqlContext.sessionState, + Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) => +try { --- End diff -- existing, but should `mapPartitionsWithStateStore` be implementing the abort handling. Seems generic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778114 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +trait State[S] extends LogicalState[S] { + + def exists: Boolean + + def get(): S + + def update(newState: S): Unit + + def remove(): Unit --- End diff -- Scala doc for these, even though its pretty obvious. In particular, I assume its safe to call update() get remove() more than once in the function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779663 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -235,3 +240,86 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning } + + +/** + * Physical operator for executing streaming mapGroupsWithState. + */ +case class MapGroupsWithStateExec( +func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any], +keyDeserializer: Expression, // probably not needed +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +stateId: Option[OperatorStateId], +stateDeserializer: Expression, +stateSerializer: Seq[NamedExpression], +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter { + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed? + + override protected def doExecute(): RDD[InternalRow] = { + --- End diff -- nit: extra newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98775275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next --- End diff -- Any updates to the state will be stored and passed to the user given function in subsequent batches when executed as a Streaming Query. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778548 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution --- End diff -- Should this be in `streaming`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -235,3 +240,86 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning } + + +/** + * Physical operator for executing streaming mapGroupsWithState. + */ +case class MapGroupsWithStateExec( +func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any], +keyDeserializer: Expression, // probably not needed +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +stateId: Option[OperatorStateId], +stateDeserializer: Expression, +stateSerializer: Seq[NamedExpression], +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter { + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed? + + override protected def doExecute(): RDD[InternalRow] = { + +child.execute().mapPartitionsWithStateStore[InternalRow]( --- End diff -- I don't think you need the type here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777503 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and --- End diff -- I think we want to say what state we are talking about. Something like "per-key state from previous invocations of the function in a StreamingQuery" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98780383 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala --- @@ -144,6 +145,12 @@ object ObjectOperator { (i: InternalRow) => proj(i).get(0, deserializer.dataType) } + def deserializeRowToObject( +deserializer: Expression): InternalRow => Any = { --- End diff -- indent, also does this not fit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -235,3 +240,86 @@ case class StateStoreSaveExec( override def outputPartitioning: Partitioning = child.outputPartitioning } + + +/** + * Physical operator for executing streaming mapGroupsWithState. + */ +case class MapGroupsWithStateExec( +func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any], +keyDeserializer: Expression, // probably not needed +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +stateId: Option[OperatorStateId], +stateDeserializer: Expression, +stateSerializer: Seq[NamedExpression], +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter { + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) // is this ordering needed? --- End diff -- Yes, the GroupedIterator relies on sorting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779015 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -90,6 +93,14 @@ class IncrementalExecution( keys, Some(stateId), child) :: Nil)) + case MapGroupsWithStateExec( +func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, --- End diff -- indent inconsistent with above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98775825 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. --- End diff -- while maintaining some user-defined state for each key. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98780164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -184,7 +189,7 @@ case class StateStoreSaveExec( } // Assumption: Append mode can be done only when watermark has been specified -store.remove(watermarkPredicate.get.eval) +store.remove(watermarkPredicate.get.eval _) --- End diff -- Why this change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778823 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.State + +/** Internal implementation of the [[State]] interface */ +private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] { + private var value: S = optionalValue.getOrElse(null.asInstanceOf[S]) + private var defined: Boolean = optionalValue.isDefined + private var updated: Boolean = false // whether value has been updated (but not removed) + private var removed: Boolean = false // whether value has eben removed + + // = Public API = + override def exists: Boolean = { +defined + } + + override def get(): S = { --- End diff -- no `()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779142 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan { } } +trait StateStoreReader extends StatefulOperator { --- End diff -- and lowercase if it contains multiple classes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777860 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +trait State[S] extends LogicalState[S] { + + def exists: Boolean + + def get(): S --- End diff -- No `()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98779264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan { } } +trait StateStoreReader extends StatefulOperator { + override lazy val metrics = Map( +"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) +} + +trait StateStoreWriter extends StatefulOperator { --- End diff -- Scala doc for both of these. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777585 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +trait State[S] extends LogicalState[S] { --- End diff -- `KeyState`? State just feels very generic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778773 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.State + +/** Internal implementation of the [[State]] interface */ +private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] { --- End diff -- I would consider using `null` here to avoid extra allocations in the critical path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98776628 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @see [[State]] for more details of how to update/remove state in the function. + * @since 2.1.1 + */ + @Experimental + @InterfaceStability.Evolving + def mapGroupsWithState[STATE: Encoder, OUT: Encoder]( --- End diff -- I think it would be useful to come up with an example use case since this is pretty complicated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777095 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @see [[State]] for more details of how to update/remove state in the function. + * @since 2.1.1 + */ + @Experimental + @InterfaceStability.Evolving + def mapGroupsWithState[STATE: Encoder, OUT: Encoder]( + func: (K, Iterator[V], State[STATE]) => OUT): Dataset[OUT] = { +val f = (key: K, it: Iterator[V], s: State[STATE]) => Iterator(func(key, it, s)) +flatMapGroupsWithState[STATE, OUT](f) + } + + /** + * ::Experimental:: + * (Java-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @see [[State]] for more details of how to update/remove state in the function. + * @since 2.1.1 + */ + @Experimental + @InterfaceStability.Evolving + def mapGroupsWithState[STATE, OUT]( --- End diff -- Since this is pretty complicated and we might iterate over time, I wonder if we shouldn't just put the full explanation in `State` and link there with a very brief version for each function. Not sure... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98776538 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. --- End diff -- I'd maybe put these into bullets as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98778649 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.State + +/** Internal implementation of the [[State]] interface */ +private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] { + private var value: S = optionalValue.getOrElse(null.asInstanceOf[S]) + private var defined: Boolean = optionalValue.isDefined + private var updated: Boolean = false // whether value has been updated (but not removed) + private var removed: Boolean = false // whether value has eben removed --- End diff -- "been" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98776799 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql]( } /** + * ::Experimental:: + * (Scala-specific) + * Applies the given function to each group of data, while using an additional keyed state. + * For each unique group, the function will be passed the group key and an iterator that contains + * all of the elements in the group. The function can return an object of arbitrary type, and + * optionally update or remove the corresponding state. The returned object will form a new + * [[Dataset]]. + * + * This function can be applied on both batch and streaming Datasets. With a streaming dataset, + * this function will be once for each in every trigger. For each key, the updated state from the + * function call in a trigger will be the state available in the function call in the next + * trigger. However, for batch, `mapGroupsWithState` behaves exactly as `mapGroups` and the + * function is called only once per key without any prior state. + * + * There is no guaranteed ordering of values in the iterator in the function. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @see [[State]] for more details of how to update/remove state in the function. + * @since 2.1.1 + */ + @Experimental + @InterfaceStability.Evolving + def mapGroupsWithState[STATE: Encoder, OUT: Encoder]( --- End diff -- I would also stick with `S` and `U` to match other functions in the class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777806 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state --- End diff -- A user defined type that can be stored for each key. Must be encodable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98777964 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql.catalyst.plans.logical.LogicalState + +/** + * :: Experimental :: + * + * Wrapper class for interacting with state data in `mapGroupsWithState` and + * `flatMapGroupsWithState` operations on + * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]]. + * + * @note Operations on state are not threadsafe. + * + * Scala example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *def mappingFunction(key: String, value: Iterable[Int], state: State[Int]): Option[String] = { + * // Check if state exists + * if (state.exists) { + *val existingState = state.get // Get the existing state + *val shouldRemove = ... // Decide whether to remove the state + *if (shouldRemove) { + * state.remove() // Remove the state + *} else { + * val newState = ... + * state.update(newState)// Set the new state + *} + * } else { + *val initialState = ... + *state.update(initialState) // Set the initial state + * } + * ... // return something + *} + * + * }}} + * + * Java example of using `State`: + * {{{ + *// A mapping function that maintains an integer state for string keys and returns a string. + *Function3, State, String> mappingFunction = + * new Function3, State, String>() { + * + * @Override + * public String call(String key, Optional value, State state) { + * if (state.exists()) { + * int existingState = state.get(); // Get the existing state + * boolean shouldRemove = ...; // Decide whether to remove the state + * if (shouldRemove) { + * state.remove(); // Remove the state + * } else { + * int newState = ...; + * state.update(newState); // Set the new state + * } + * } else { + * int initialState = ...; // Set the initial state + * state.update(initialState); + * } + * ... // return something + * } + * }; + * }}} + * + * @tparam S Type of the state + * @since 2.1.1 + */ +@Experimental +@InterfaceStability.Evolving +trait State[S] extends LogicalState[S] { + + def exists: Boolean + + def get(): S + + def update(newState: S): Unit + + def remove(): Unit + + @inline final def getOption(): Option[S] = if (exists) Some(get()) else None --- End diff -- No `()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14725: [SPARK-17161] [PYSPARK][ML] Add PySpark-ML JavaWr...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/14725#discussion_r98780626 --- Diff: python/pyspark/ml/wrapper.py --- @@ -16,6 +16,10 @@ # from abc import ABCMeta, abstractmethod +import sys --- End diff -- That looks fine, we use xrange similarly elsewhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16722: [SPARK-9478][ML][MLlib] Add sample weights to dec...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/16722#discussion_r98778763 --- Diff: mllib-local/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala --- @@ -48,7 +48,7 @@ object TestingUtils { /** * Private helper function for comparing two values using absolute tolerance. */ - private def AbsoluteErrorComparison(x: Double, y: Double, eps: Double): Boolean = { + private[ml] def AbsoluteErrorComparison(x: Double, y: Double, eps: Double): Boolean = { --- End diff -- It's not used. I just changed the scope of both methods, I can change it back of course. I don't see a great reason to make this public since most users will use `relTol` instead. I'm open to other opinions though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16696: [SPARK-19350] [SQL] Cardinality estimation of Lim...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16696#discussion_r98776491 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -727,37 +728,18 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN } override def computeStats(conf: CatalystConf): Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] -val sizeInBytes = if (limit == 0) { - // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero - // (product of children). - 1 -} else { - (limit: Long) * output.map(a => a.dataType.defaultSize).sum -} -child.stats(conf).copy(sizeInBytes = sizeInBytes) +val childStats = child.stats(conf) +// Don't propagate column stats, because we don't know the distribution after a limit operation +Statistics( + sizeInBytes = EstimationUtils.getOutputSize(output, limit, childStats.attributeStats), --- End diff -- Agreed. We can pick the smaller value between the child node's row count and the limit number. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16666: [SPARK-19319][SparkR]:SparkR Kmeans summary returns erro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/1 **[Test build #72207 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72207/testReport)** for PR 1 at commit [`2110536`](https://github.com/apache/spark/commit/2110536cab25ff690bf60ad28d08d69cd648e97c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16666: [SPARK-19319][SparkR]:SparkR Kmeans summary returns erro...
Github user wangmiao1981 commented on the issue: https://github.com/apache/spark/pull/1 ping @felixcheung --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16757 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16757 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72203/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16757 **[Test build #72203 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72203/testReport)** for PR 16757 at commit [`6aad5d8`](https://github.com/apache/spark/commit/6aad5d844b26f675cf13d3c898da785271ffcc7c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16730: [SPARK-19395][SparkR]Convert coefficients in summ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16730 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16730: [SPARK-19395][SparkR]Convert coefficients in summary to ...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/16730 merged to master, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16740: [SPARK-19400][ML] Allow GLM to handle intercept only mod...
Github user sethah commented on the issue: https://github.com/apache/spark/pull/16740 Allowing offset will only require a small change to the intercept calculation, won't it? scala val agg = data.agg(sum(w * (col("label") - col("label"))), sum(w)).first() link.link(agg.getDouble(0) / agg.getDouble(1)) I'm still in favor of fixing the IRLS bug, but we should be able to return an analytic result without too much trouble, unless I'm missing something. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16729: [SPARK-19391][SparkR][ML] Tweedie GLM API for Spa...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/16729#discussion_r98760363 --- Diff: R/pkg/R/mllib_regression.R --- @@ -53,12 +53,19 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj")) #' the result of a call to a family function. Refer R family at #' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}. #' Currently these families are supported: \code{binomial}, \code{gaussian}, -#' \code{Gamma}, and \code{poisson}. +#' \code{Gamma}, \code{poisson} and \code{"tweedie"}. +#' The tweedie family must be specified using character string \code{"tweedie"}. The family +#' function \code{tweedie} in the \code{statmod} package is not supported. --- End diff -- I prefer not having hard dependency on an external package in R, but if statmod is present, could we support `family = tweedie` (not string) too? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r98749100 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala --- @@ -135,7 +136,24 @@ private[kafka010] class KafkaSourceRDD( } else { new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() { val consumer = CachedKafkaConsumer.getOrCreate( - range.topic, range.partition, executorKafkaParams) +range.topic, range.partition, executorKafkaParams, reuseKafkaConsumer) +if (range.fromOffset < 0 || range.untilOffset < 0) { + // Late bind the offset range + val fromOffset = if (range.fromOffset < 0) { + consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition)) +consumer.rawConsumer.position(range.topicPartition) + } else { +range.fromOffset + } + val untilOffset = if (range.untilOffset < 0) { + consumer.rawConsumer.seekToEnd(ju.Arrays.asList(range.topicPartition)) --- End diff -- nit: add assert(range.fromOffset == -1) to avoid breaking it in future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r98746133 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging { offsets } + def cleanupLogs(): Unit = { +server.logManager.cleanupLogs() + } + + def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { +val kc = new KafkaConsumer[String, String](consumerConfiguration) +logInfo("Created consumer to get latest offsets") --- End diff -- nit: please fix the log --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r98746146 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging { offsets } + def cleanupLogs(): Unit = { +server.logManager.cleanupLogs() + } + + def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { +val kc = new KafkaConsumer[String, String](consumerConfiguration) +logInfo("Created consumer to get latest offsets") +kc.subscribe(topics.asJavaCollection) +kc.poll(0) +val partitions = kc.assignment() +kc.pause(partitions) +kc.seekToBeginning(partitions) +val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap +kc.close() +logInfo("Closed consumer to get latest offsets") --- End diff -- nit: please fix the log --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r98746070 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -274,6 +292,11 @@ class KafkaTestUtils extends Logging { props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") +withBrokerProps.map { p => --- End diff -- nit: you can change the type of `withBrokerProps` to `Map[String, Object]`. Then here you can just use `props.putAll(withBrokerProps.asJava)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r98756883 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala --- @@ -108,7 +113,14 @@ private[kafka010] class KafkaOffsetReaderImpl( def close(): Unit = consumer.close() - def fetchSpecificStartingOffsets( + override def fetchTopicPartitions(): Set[TopicPartition] = { +assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) +// Poll to get the latest assigned partitions +consumer.poll(0) +consumer.assignment().asScala.toSet --- End diff -- nit: please also call `pause` like this to avoid fetching the real data when reusing the relation. ``` val partitions = consumer.assignment() consumer.pause(partitions) partitions.asScala.toSet ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r98749048 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala --- @@ -135,7 +136,24 @@ private[kafka010] class KafkaSourceRDD( } else { new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() { val consumer = CachedKafkaConsumer.getOrCreate( - range.topic, range.partition, executorKafkaParams) +range.topic, range.partition, executorKafkaParams, reuseKafkaConsumer) +if (range.fromOffset < 0 || range.untilOffset < 0) { + // Late bind the offset range + val fromOffset = if (range.fromOffset < 0) { + consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition)) --- End diff -- nit: add `assert(range.fromOffset == -2)` to avoid breaking it in future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16758 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16758 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72206/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16758 **[Test build #72206 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72206/testReport)** for PR 16758 at commit [`8be63de`](https://github.com/apache/spark/commit/8be63de507f9e4fc258aca1d467509e06b674565). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16758 **[Test build #72206 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72206/testReport)** for PR 16758 at commit [`8be63de`](https://github.com/apache/spark/commit/8be63de507f9e4fc258aca1d467509e06b674565). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16751: [SPARK-19409][BUILD] Bump parquet version to 1.8.2
Github user rxin commented on the issue: https://github.com/apache/spark/pull/16751 can you put rest of the cleanups in one place? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14866#discussion_r98751522 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] { } /** + * Check if there any cartesian products between joins of any type in the optimized plan tree. + * Throw an error if a cartesian product is found without an explicit cross join specified. + * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is true. + * + * This rule must be run AFTER the ReorderJoin rule since the join conditions for each join must be + * collected before checking if it is a cartesian product. If you have + * SELECT * from R, S where R.r = S.s, + * the join between R and S is not a cartesian product and therefore should be allowed. + * The predicate R.r = S.s is not recognized as a join condition until the ReorderJoin rule. + */ +case class CheckCartesianProducts(conf: CatalystConf) --- End diff -- If we do it in the optimizer, it does not cover all the cases, right? Cartesian Product might be chosen based on the statistics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14866#discussion_r98751322 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] { } /** + * Check if there any cartesian products between joins of any type in the optimized plan tree. + * Throw an error if a cartesian product is found without an explicit cross join specified. + * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is true. + * + * This rule must be run AFTER the ReorderJoin rule since the join conditions for each join must be + * collected before checking if it is a cartesian product. If you have + * SELECT * from R, S where R.r = S.s, + * the join between R and S is not a cartesian product and therefore should be allowed. + * The predicate R.r = S.s is not recognized as a join condition until the ReorderJoin rule. + */ +case class CheckCartesianProducts(conf: CatalystConf) --- End diff -- If we do it in the optimizer, it does not cover all the cases, right? Cartesian Product might be chosen based on the statistics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14866#discussion_r98750834 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] { } /** + * Check if there any cartesian products between joins of any type in the optimized plan tree. + * Throw an error if a cartesian product is found without an explicit cross join specified. + * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is true. + * + * This rule must be run AFTER the ReorderJoin rule since the join conditions for each join must be + * collected before checking if it is a cartesian product. If you have + * SELECT * from R, S where R.r = S.s, + * the join between R and S is not a cartesian product and therefore should be allowed. + * The predicate R.r = S.s is not recognized as a join condition until the ReorderJoin rule. + */ +case class CheckCartesianProducts(conf: CatalystConf) --- End diff -- If we do it here, it does not cover all the cases, right? `Cartesian Product` might be chosen based on the statistics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16746: [SPARK-15648][SQL] Add teradataDialect for JDBC c...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/16746#discussion_r98751002 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.Types +import org.apache.spark.sql.types._ + + +private case object TeradataDialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = { url.startsWith("jdbc:teradata") } + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { +case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR)) +case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR)) +case _ => None + } --- End diff -- What about `isCascadingTruncateTable`? Could you check if Teradata does truncate cascadingly by default for `TRUNCATE TABLE` statement? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16758#discussion_r98748723 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala --- @@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan { } } +trait StateStoreReader extends StatefulOperator { --- End diff -- This files should probably be renamed from StatefulAggregation to StatefulOperations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16746: [SPARK-15648][SQL] Add teradataDialect for JDBC c...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/16746#discussion_r98750209 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.Types +import org.apache.spark.sql.types._ + + +private case object TeradataDialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = { url.startsWith("jdbc:teradata") } + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { +case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR)) +case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR)) +case _ => None + } --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15237: [SPARK-17663] [CORE] SchedulableBuilder should handle in...
Github user erenavsarogullari commented on the issue: https://github.com/apache/spark/pull/15237 Hi @squito, Sorry, i am quite busy this week and happy to be merged this PR now if it is also ok for you. I plan to address fileName logging via separated jira. Also To inform user for the following cases can be useful by adding logging: - If `schedulerAllocFile` property is not set and `DEFAULT_SCHEDULER_FILE` is used, - If `schedulerAllocFile` property is not set and `DEFAULT_SCHEDULER_FILE` do not exist, - If `schedulerAllocFile` does not exist. So thinking to create a single Jira for `FairSchedulableBuilder` logging improvement by covering both fileName logging(if file is found successfully) and above failure cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16758 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72205/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16758 **[Test build #72205 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72205/testReport)** for PR 16758 at commit [`6fab7a5`](https://github.com/apache/spark/commit/6fab7a5fde75309198d1e73e66948d82d0f590e6). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16758 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16690: [SPARK-19347] ReceiverSupervisorImpl can add bloc...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16690#discussion_r98747972 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala --- @@ -75,10 +106,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ + @deprecated("use 'askSync' instead.", "2.1.0") --- End diff -- 2.2.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16690: [SPARK-19347] ReceiverSupervisorImpl can add bloc...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16690#discussion_r98748039 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala --- @@ -19,6 +19,7 @@ package org.apache.spark.rpc import scala.concurrent.Future import scala.reflect.ClassTag +import scala.util.control.NonFatal --- End diff -- not used --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16690: [SPARK-19347] ReceiverSupervisorImpl can add bloc...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16690#discussion_r98747902 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala --- @@ -91,6 +123,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ + @deprecated("use 'askSync' instead.", "2.1.0") --- End diff -- 2.2.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16751: [SPARK-19409][BUILD] Bump parquet version to 1.8.2
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/16751 Ya, @mallman . However, with the same reason, I conclude to put them away from here. Exactly, the opposite direction of your opinion. If we try to fix all of them in a single shot, it will not merged for a long time. At least, you can see #16756 starts already. I think those workarounds are going to be cleaned up soon if this commits are not reverted for some reason. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16758 **[Test build #72205 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72205/testReport)** for PR 16758 at commit [`6fab7a5`](https://github.com/apache/spark/commit/6fab7a5fde75309198d1e73e66948d82d0f590e6). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16758: Arbitrary stateful operations with MapGroupsWithS...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/16758 Arbitrary stateful operations with MapGroupsWithState ## What changes were proposed in this pull request? `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState` *Requirements* - Users should be able to specify a function that can do the following - Access the input row corresponding to a key - Access the previous state corresponding to a key - Optionally, update or remove the state - Output any number of new rows (or none at all) *Proposed API* ``` // New methods on KeyValueGroupedDataset class KeyValueGroupedDataset[K, V] { // Scala friendly def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], State[S]) => U) def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], State[S]) => Iterator[U]) // Java friendly def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) } // --- New Java-friendly function classes --- public interface MapGroupsWithStateFunction extends Serializable { R call(K key, Iterator values, state: State) throws Exception; } public interface FlatMapGroupsWithStateFunction extends Serializable { Iterator call(K key, Iterator values, state: State) throws Exception; } // -- Wrapper class for state data -- trait State[S] { def exists(): Boolean def get(): S// throws Exception is state does not exist def getOption(): Option[S] def update(newState: S): Unit def remove(): Unit // exists() will be false after this } ``` Key Semantics of the State class - The state can be null. - If the state.remove() is called, then state.exists() will return false, and getOption will returm None. - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...). - None of the operations are thread-safe. This is to avoid memory barriers. *Usage* ``` val stateFunc = (word: String, words: Iterator[String, runningCount: State[Long]) => { val newCount = words.size + runningCount.getOption.getOrElse(0L) runningCount.update(newCount) (word, newCount) } dataset // type is Dataset[String] .groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String] .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)] ``` ## How was this patch tested? New unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark mapWithState Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16758.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16758 commit d10da2700a970c82537b678db34b2d80cebebcc8 Author: Tathagata Das Date: 2017-01-18T00:45:54Z Prototype - almost working commit 78cd1853033a091b62cb350879bb6c3a0b6c8641 Author: Tathagata Das Date: 2017-01-18T01:05:51Z Renamed to mapGroupsWithState commit 0c22e08a8f9ad66a49bc939652fee14577f9bd4b Author: Tathagata Das Date: 2017-01-18T01:56:07Z Fixed bugs commit 52e14e479ffa1e38c9efc5b063a95831caab6997 Author: Tathagata Das Date: 2017-01-18T10:52:20Z Removed prints commit 529aefe6d7cb9cd54e20c0cdaa11cec90a4f16be Author: Tathagata Das Date: 2017-01-18T10:58:27Z Test state remove commit 57f5e8d2e8a74a8269667a9d4d89971eb9107c07 Author: Tathagata Das Date: 2017-01-18T20:26:53Z Test restart, and test with metrics commit 3e0d8dcfa81d58ae6ca6754cc54c19383179802a Author: Tathagata Das Date: 2017-01-21T02:25:29Z Fixed everything commit b54fa230eda141316713e3b1d1c56d8a28fd3a6c Author: Tathagata Das Date: 2017-01-30T02:06:00Z Refactored, added java APIs and tests commit ab3cb6c961f0d861a24c2146dcb9dc0380c8adc9 Author: Tathagata Das Date: 2017-01-30T05:07:19Z Refactored commit ddf4550b765af89a4ed7d80edabfe3370cbd1e23 Author: Tathagata Das Date: 2017-01-30T21:29:44Z Added more test commit 3133f83195ca562e866f6565cdfbd59e118cb6aa Author: Tathagata Das Date: 2017-01-31T18:53:58Z Merge remote-tracking bran
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98743557 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +long key = c.getUsed(); +List list = null; --- End diff -- ``` list = sorted.get(key); if (list == null) { // instantiate, add to map } list.add(...); ``` This is slightly cheaper and shorter than your code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15009 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72204/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98744044 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { -try { - long released = c.spill(required - got, consumer); - if (released > 0) { -logger.debug("Task {} released {} from {} for {}", taskAttemptId, - Utils.bytesToString(released), c, consumer); -got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); -if (got >= required) { - break; -} +long key = c.getUsed(); +List list = null; +if (sortedConsumers.containsKey(key)) { + list = sortedConsumers.get(key); + list.add(c); +} else { + list = new ArrayList(1); + list.add(c); + sortedConsumers.put(key, list); +} + } +} +while (!sortedConsumers.isEmpty()) { + // Get the consumer using the least memory more than the remaining required memory. + Map.Entry> currentEntry = +sortedConsumers.ceilingEntry(required - got); + // No consumer has used memory more than the remaining required memory. + // Get the consumer of largest used memory. + if (currentEntry == null) { +currentEntry = sortedConsumers.lastEntry(); + } + List cList = currentEntry.getValue(); + MemoryConsumer c = cList.remove(cList.size() - 1); + if (cList.size() == 0) { --- End diff -- `cList.isEmpty()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15009 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/16603#discussion_r98743237 --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java --- @@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory +// Sort the consumers according their memory usage. So we avoid spilling the same consumer +// which is just spilled in last few times and re-spilling on it will produce many small +// spill files. +TreeMap> sortedConsumers = + new TreeMap>(); --- End diff -- nit: you could just have used `new TreeMap<>()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15009 **[Test build #72204 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72204/testReport)** for PR 15009 at commit [`2fdcec9`](https://github.com/apache/spark/commit/2fdcec97f2455983684af8a2e6bcbe4214991972). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15009 **[Test build #72204 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72204/testReport)** for PR 15009 at commit [`2fdcec9`](https://github.com/apache/spark/commit/2fdcec97f2455983684af8a2e6bcbe4214991972). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15009 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72201/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15009 Build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15009 **[Test build #72201 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72201/consoleFull)** for PR 15009 at commit [`6a7ba5b`](https://github.com/apache/spark/commit/6a7ba5bfdd2cb165956992907f681ab3ad85154e). * This patch passes all tests. * This patch **does not merge cleanly**. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16751: [SPARK-19409][BUILD] Bump parquet version to 1.8.2
Github user mallman commented on the issue: https://github.com/apache/spark/pull/16751 FYI, there are at least two workarounds in the Spark codebase which can potentially be removed as a consequence of this upgrade. For example: https://github.com/apache/spark/blob/5de1737b02710e36f6804d2ae243d1aeb30a0b32/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L549-L558 and https://github.com/apache/spark/blob/ca6391637212814b7c0bd14c434a6737da17b258/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L175-L178 These come immediately to mind. There may be others. I think this PR would have been a good opportunity to remove these workarounds, but it's been closed and merged so that's water under the bridge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16757 cc @cloud-fan @windpiger @sameeragarwal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14725: [SPARK-17161] [PYSPARK][ML] Add PySpark-ML JavaWr...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/14725#discussion_r98736777 --- Diff: python/pyspark/ml/wrapper.py --- @@ -16,6 +16,10 @@ # from abc import ABCMeta, abstractmethod +import sys --- End diff -- Does this look ok now @holdenk? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16757 **[Test build #72203 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72203/testReport)** for PR 16757 at commit [`6aad5d8`](https://github.com/apache/spark/commit/6aad5d844b26f675cf13d3c898da785271ffcc7c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15120: [SPARK-4563][core] Allow driver to advertise a di...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/15120#discussion_r98734873 --- Diff: core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala --- @@ -66,7 +66,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con findEntry(key) match { case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString) case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString) - case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key) + case e: FallbackConfigEntry[_] => get(e.fallback.key) --- End diff -- What issue? The code you're commenting on does not exist in 1.6. If you're having issues, please ask questions on the mailing lists or use the bug tracker. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16757 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72202/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16757 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16757 **[Test build #72202 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72202/testReport)** for PR 16757 at commit [`dac7ec9`](https://github.com/apache/spark/commit/dac7ec99075ce98ebea92e108ad66b05537de396). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Pinging @tdas on this-- looks like you're the committer who has contributed the most to kinesis-asl. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16281: [SPARK-13127][SQL] Update Parquet to 1.9.0
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/16281 Thank you for sharing that information, @mallman . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16735: [SPARK-19228][SQL] Introduce tryParseDate method ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16735#discussion_r98729386 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala --- @@ -140,12 +137,21 @@ private[csv] object CSVInferSchema { } } + private def tryParseDate(field: String, options: CSVOptions): DataType = { +// This case infers a custom `dateFormat` is set. +if ((allCatch opt options.dateFormat.parse(field)).isDefined) { + DateType +} else { + tryParseTimestamp(field, options) +} + } + private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { -// This case infers a custom `dataFormat` is set. +// This case infers a custom `timestampFormat` is set. if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { TimestampType } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) { - // We keep this for backwords competibility. + // We keep this for backwards compatibility. TimestampType } else { tryParseBoolean(field, options) --- End diff -- @HyukjinKwon That does not work correctly, if we put the change there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16281: [SPARK-13127][SQL] Update Parquet to 1.9.0
Github user mallman commented on the issue: https://github.com/apache/spark/pull/16281 FYI, we've been using 1.9.0 patched with a fix for https://issues.apache.org/jira/browse/PARQUET-783 without problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16757 **[Test build #72202 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72202/testReport)** for PR 16757 at commit [`dac7ec9`](https://github.com/apache/spark/commit/dac7ec99075ce98ebea92e108ad66b05537de396). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org