[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/13382#discussion_r71262847 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala --- @@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter( extends OutputStream with Logging { + /** + * Guards against close calls, e.g. from a wrapping stream. + * Call manualClose to close the stream that was extended by this trait. --- End diff -- Also comment that this is needed to support resume writing after a commit(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/13382#discussion_r71262784 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala --- @@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter( extends OutputStream with Logging { + /** + * Guards against close calls, e.g. from a wrapping stream. + * Call manualClose to close the stream that was extended by this trait. + */ + private trait ManualCloseOutputStream extends OutputStream { +abstract override def close(): Unit = { + flush() +} + +def manualClose(): Unit = { + super.close() +} + } + /** The file channel, used for repositioning / truncating the file. */ private var channel: FileChannel = null + private var mcs: ManualCloseOutputStream = null private var bs: OutputStream = null private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private var initialized = false + private var streamOpen = false private var hasBeenClosed = false - private var commitAndCloseHasBeenCalled = false /** * Cursors used to represent positions in the file. * - * ||--- | - * ^^ ^ - * ||finalPosition - * | reportedPosition - * initialPosition + * ||---| --- End diff -- Could you update the diagram? I think this is misleading since reportedPosition will always be ahead of committedPosition except during some internal processing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/12836 no, go ahead to submit one:) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] [WIP] Store the Inferred Sche...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71262073 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -270,6 +291,11 @@ case class CreateDataSourceTableAsSelectCommand( } } +case class SchemaType private(name: String) +object SchemaType { --- End diff -- Sure, will do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14102: [SPARK-16434][SQL] Avoid per-record type dispatch...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/14102#discussion_r71261575 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala --- @@ -60,13 +60,13 @@ private[sql] object InferSchema { } } catch { case _: JsonParseException if shouldHandleCorruptRecord => -Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType +Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType case _: JsonParseException => None } } }.fold(StructType(Seq()))( - compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)) + compatibleRootType(columnNameOfCorruptRecord, shouldHandleCorruptRecord)) --- End diff -- The name is `columnNameOfCorruptRecord` across JSON data source, not `columnNameOfCorruptRecords`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/14132#discussion_r71261455 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1774,6 +1775,51 @@ class Analyzer( } /** + * Substitute Hints. + * - BROADCAST/BROADCASTJOIN/MAPJOIN match the closest table with the given name parameters. + */ + object SubstituteHints extends Rule[LogicalPlan] { +def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case logical: LogicalPlan => logical transformDown { +case h @ Hint(name, parameters, child) +if Seq("BROADCAST", "BROADCASTJOIN", "MAPJOIN").contains(name.toUpperCase) => + var resolvedChild = child + + for (param <- parameters) { +val names = param.split("\\.") +val tid = if (names.length > 1) { + TableIdentifier(names(1), Some(names(0))) +} else { + TableIdentifier(param, None) +} +try { + catalog.lookupRelation(tid) + + var stop = false + resolvedChild = resolvedChild.transformDown { +case r @ BroadcastHint(SubqueryAlias(t, _)) + if !stop && resolver(t, tid.identifier) => + stop = true + r +case r @ SubqueryAlias(t, _) if !stop && resolver(t, tid.identifier) => + stop = true + BroadcastHint(r) --- End diff -- Sure, If you want to remove this, I can simply this. It's a little bit legacy I tried to follow your advice as much as possible. (As I mentioned before, I decide to block in Parser layer after I found that Hive does.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14065: [SPARK-14743][YARN] Add a configurable token manager for...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/14065 Thanks @tgravescs for your comments, I will add the docs about it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14102: [SPARK-16434][SQL] Avoid per-record type dispatch in JSO...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14102 **[Test build #62501 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62501/consoleFull)** for PR 14102 at commit [`cfe6bed`](https://github.com/apache/spark/commit/cfe6beda1a1db64aab5d2f84a68a5ee1e2bdd905). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14102: [SPARK-16434][SQL] Avoid per-record type dispatch in JSO...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/14102 @yhuai the commits I pushed include the changes below: - Reverts the changes in `JSONOptions` about `columnNameOfCorruptRecord` https://github.com/apache/spark/pull/14102#discussion_r71095725. - Changes `skipFieldNameIfExists` to `convertField` with documentations https://github.com/apache/spark/pull/14102#discussion_r71096761. - Adds `convertValue` for `null` checking so, the weird dirty comparison https://github.com/apache/spark/pull/14102#discussion_r71097210 was removed. - Adds a example for https://github.com/apache/spark/pull/14102#discussion_r71096571 - Corrects style nits. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14207: [SPARK-16552] [SQL] [WIP] Store the Inferred Sche...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14207#discussion_r71261112 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -270,6 +291,11 @@ case class CreateDataSourceTableAsSelectCommand( } } +case class SchemaType private(name: String) +object SchemaType { --- End diff -- will we have more schema type? If not, I think a boolean flag `isSchemaInferred` should be good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/14132#discussion_r71261042 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala --- @@ -425,6 +452,49 @@ class SQLBuilder(logicalPlan: LogicalPlan) extends Logging { } } +/** + * Merge and move upward to the nearest Project. + * A broadcast hint comment is scattered into multiple nodes inside the plan, and the + * information of BroadcastHint resides its current position inside the plan. In order to + * reconstruct broadcast hint comment, we need to pack the information of BroadcastHint into + * Hint("BROADCAST", _, _) and collect them up by moving upward to the nearest Project node. + */ +object NormalizeBroadcastHint extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +// Capture the broadcasted information and store it in Hint. +case BroadcastHint(child @ SubqueryAlias(_, Project(_, SQLTable(database, table, _, _ => + Hint("BROADCAST", Seq(table), child) + +// Nearest Project is found. +case p @ Project(_, Hint(_, _, _)) => p + +// Merge BROADCAST hints up to the nearest Project. +case Hint("BROADCAST", params1, h @ Hint("BROADCAST", params2, _)) => + h.copy(parameters = params1 ++ params2) +case j @ Join(h1 @ Hint("BROADCAST", p1, left), h2 @ Hint("BROADCAST", p2, right), _, _) => + h1.copy(parameters = p1 ++ p2, child = j.copy(left = left, right = right)) + +// Bubble up BROADCAST hints to the nearest Project. +case j @ Join(h @ Hint("BROADCAST", _, hintChild), _, _, _) => + h.copy(child = j.copy(left = hintChild)) +case j @ Join(_, h @ Hint("BROADCAST", _, hintChild), _, _) => + h.copy(child = j.copy(right = hintChild)) +case s @ SubqueryAlias(_, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = s.copy(child = hintChild)) +case ll @ LocalLimit(_, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = ll.copy(child = hintChild)) +case f @ Filter(_, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = f.copy(child = hintChild)) +case a @ Aggregate(_, _, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = a.copy(child = hintChild)) +case s @ Sort(_, _, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = s.copy(child = hintChild)) +case g @ Generate(_, _, _, _, _, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = g.copy(child = hintChild)) +// Set operation is not allowed to be across. UNION/INTERCEPT/EXCEPT --- End diff -- Okay. But, that was the reason not to allowed there. Hmm, maybe it looks different. Sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14132#discussion_r71260947 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1774,6 +1775,51 @@ class Analyzer( } /** + * Substitute Hints. + * - BROADCAST/BROADCASTJOIN/MAPJOIN match the closest table with the given name parameters. + */ + object SubstituteHints extends Rule[LogicalPlan] { +def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case logical: LogicalPlan => logical transformDown { +case h @ Hint(name, parameters, child) +if Seq("BROADCAST", "BROADCASTJOIN", "MAPJOIN").contains(name.toUpperCase) => + var resolvedChild = child + + for (param <- parameters) { +val names = param.split("\\.") +val tid = if (names.length > 1) { + TableIdentifier(names(1), Some(names(0))) +} else { + TableIdentifier(param, None) +} +try { + catalog.lookupRelation(tid) + + var stop = false + resolvedChild = resolvedChild.transformDown { +case r @ BroadcastHint(SubqueryAlias(t, _)) + if !stop && resolver(t, tid.identifier) => + stop = true + r +case r @ SubqueryAlias(t, _) if !stop && resolver(t, tid.identifier) => + stop = true + BroadcastHint(r) --- End diff -- If we do not support `db.table`, why we still compare the whole identifier? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13704: [SPARK-15985][SQL] Eliminate redundant cast from ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13704#discussion_r71260242 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types._ + +class SimplifyCastsSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = Batch("SimplifyCasts", FixedPoint(50), SimplifyCasts) :: Nil + } + + test("non-nullable to non-nullable array cast") { +val input = LocalRelation('a.array(ArrayType(IntegerType, false))) +val array_intPrimitive = 'a.array(ArrayType(IntegerType, false)) +val plan = input.select(array_intPrimitive --- End diff -- `val plan = input.select('a.cast(ArrayType(IntegerType, false)).as("casted")).analyze` you need to resolve the plan before optimize it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14132#discussion_r71260158 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala --- @@ -425,6 +452,49 @@ class SQLBuilder(logicalPlan: LogicalPlan) extends Logging { } } +/** + * Merge and move upward to the nearest Project. + * A broadcast hint comment is scattered into multiple nodes inside the plan, and the + * information of BroadcastHint resides its current position inside the plan. In order to + * reconstruct broadcast hint comment, we need to pack the information of BroadcastHint into + * Hint("BROADCAST", _, _) and collect them up by moving upward to the nearest Project node. + */ +object NormalizeBroadcastHint extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +// Capture the broadcasted information and store it in Hint. +case BroadcastHint(child @ SubqueryAlias(_, Project(_, SQLTable(database, table, _, _ => + Hint("BROADCAST", Seq(table), child) + +// Nearest Project is found. +case p @ Project(_, Hint(_, _, _)) => p + +// Merge BROADCAST hints up to the nearest Project. +case Hint("BROADCAST", params1, h @ Hint("BROADCAST", params2, _)) => + h.copy(parameters = params1 ++ params2) +case j @ Join(h1 @ Hint("BROADCAST", p1, left), h2 @ Hint("BROADCAST", p2, right), _, _) => + h1.copy(parameters = p1 ++ p2, child = j.copy(left = left, right = right)) + +// Bubble up BROADCAST hints to the nearest Project. +case j @ Join(h @ Hint("BROADCAST", _, hintChild), _, _, _) => + h.copy(child = j.copy(left = hintChild)) +case j @ Join(_, h @ Hint("BROADCAST", _, hintChild), _, _) => + h.copy(child = j.copy(right = hintChild)) +case s @ SubqueryAlias(_, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = s.copy(child = hintChild)) +case ll @ LocalLimit(_, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = ll.copy(child = hintChild)) +case f @ Filter(_, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = f.copy(child = hintChild)) +case a @ Aggregate(_, _, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = a.copy(child = hintChild)) +case s @ Sort(_, _, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = s.copy(child = hintChild)) +case g @ Generate(_, _, _, _, _, h @ Hint("BROADCAST", _, hintChild)) => + h.copy(child = g.copy(child = hintChild)) +// Set operation is not allowed to be across. UNION/INTERCEPT/EXCEPT --- End diff -- Your reply is different from the comment in the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14132#discussion_r71259834 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -87,6 +87,7 @@ class Analyzer( EliminateUnions), Batch("Resolution", fixedPoint, ResolveRelations :: + SubstituteHints :: --- End diff -- In each batch, the order of rules should not matter. That means, the rule `SubstituteHints` impacts the other rules. Please fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/14132 I see. For `NormalizeBroadcastHint`, I will try to minimize the cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14132#discussion_r71259759 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1774,6 +1775,51 @@ class Analyzer( } /** + * Substitute Hints. + * - BROADCAST/BROADCASTJOIN/MAPJOIN match the closest table with the given name parameters. + */ + object SubstituteHints extends Rule[LogicalPlan] { +def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case logical: LogicalPlan => logical transformDown { +case h @ Hint(name, parameters, child) +if Seq("BROADCAST", "BROADCASTJOIN", "MAPJOIN").contains(name.toUpperCase) => + var resolvedChild = child + + for (param <- parameters) { +val names = param.split("\\.") --- End diff -- Sorry, I do not think this is beyond of the scope. Instead, this is a bug. This identifier stores a table identifier. If we do not use the same parsing solution, the result will be wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14132 **[Test build #62500 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62500/consoleFull)** for PR 14132 at commit [`5ba2ad7`](https://github.com/apache/spark/commit/5ba2ad7aa6cab364e09a2c0dae529b8270aed153). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14132#discussion_r71259501 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala --- @@ -425,6 +452,49 @@ class SQLBuilder(logicalPlan: LogicalPlan) extends Logging { } } +/** + * Merge and move upward to the nearest Project. + * A broadcast hint comment is scattered into multiple nodes inside the plan, and the + * information of BroadcastHint resides its current position inside the plan. In order to + * reconstruct broadcast hint comment, we need to pack the information of BroadcastHint into + * Hint("BROADCAST", _, _) and collect them up by moving upward to the nearest Project node. + */ +object NormalizeBroadcastHint extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +// Capture the broadcasted information and store it in Hint. +case BroadcastHint(child @ SubqueryAlias(_, Project(_, SQLTable(database, table, _, _ => + Hint("BROADCAST", Seq(table), child) + +// Nearest Project is found. +case p @ Project(_, Hint(_, _, _)) => p + +// Merge BROADCAST hints up to the nearest Project. +case Hint("BROADCAST", params1, h @ Hint("BROADCAST", params2, _)) => + h.copy(parameters = params1 ++ params2) +case j @ Join(h1 @ Hint("BROADCAST", p1, left), h2 @ Hint("BROADCAST", p2, right), _, _) => + h1.copy(parameters = p1 ++ p2, child = j.copy(left = left, right = right)) + +// Bubble up BROADCAST hints to the nearest Project. --- End diff -- If you read what we did in SQLBuilder, you might know that is not the normal way we did. @rxin gave the same comment below: https://github.com/apache/spark/pull/14132#issuecomment-233503432. Keeping a white list is hard to maintain. You know, I still can find more missing cases here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14241: [SPARK-16596] [SQL] Refactor DataSourceScanExec to do pa...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14241 @ericl I was talking with @marmbrus -- it'd be better to create an API in the physical scan operator that accepts a list of filters, and then do pruning there. That is to say, we also want to move all the pruning code from physical planning into the physical operators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14132 I was referring to NormalizeBroadcastHint -- there are many cases in there and it seems error prone against future changes. Do we need all those rules? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14252: [SPARK-16615][SQL] Expose sqlContext in SparkSess...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14252 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14252: [SPARK-16615][SQL] Expose sqlContext in SparkSession
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14252 Merging in master/2.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14251 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62499/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14251 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14251 **[Test build #62499 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62499/consoleFull)** for PR 14251 at commit [`85e3144`](https://github.com/apache/spark/commit/85e31447c16c4879ba0149533f21b1e57ffe3186). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite ...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/14235#discussion_r71257848 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala --- @@ -17,15 +17,33 @@ package org.apache.spark.sql.catalyst +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, NoSuchFileException, Paths} + import scala.util.control.NonFatal import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SQLTestUtils +/** + * A test suite for LogicalPlan-to-SQL conversion. + * + * Each query has a golden generated SQL file in test/resources/sqlgen. The test suite also has + * built-in functionality to automatically generate these golden files. + * + * To re-generate golden files, run: + *SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "hive/test-only *LogicalPlanToSQLSuite" + */ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { import testImplicits._ + // Used for generating new query answer files by saving + private val regenerateGoldenFiles = +Option(System.getenv("SPARK_GENERATE_GOLDEN_FILES")).contains("1") --- End diff -- Yep. I have nothing to say. My bad. Sorry about this. :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/14235#discussion_r71257655 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala --- @@ -17,15 +17,33 @@ package org.apache.spark.sql.catalyst +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, NoSuchFileException, Paths} + import scala.util.control.NonFatal import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SQLTestUtils +/** + * A test suite for LogicalPlan-to-SQL conversion. + * + * Each query has a golden generated SQL file in test/resources/sqlgen. The test suite also has + * built-in functionality to automatically generate these golden files. + * + * To re-generate golden files, run: + *SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "hive/test-only *LogicalPlanToSQLSuite" + */ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { import testImplicits._ + // Used for generating new query answer files by saving + private val regenerateGoldenFiles = +Option(System.getenv("SPARK_GENERATE_GOLDEN_FILES")).contains("1") --- End diff -- I fixed it here https://github.com/apache/spark/commit/c4524f5193e1b3ce1c56c5aed126f4121ce26d23 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/14235#discussion_r71257369 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala --- @@ -17,15 +17,33 @@ package org.apache.spark.sql.catalyst +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, NoSuchFileException, Paths} + import scala.util.control.NonFatal import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SQLTestUtils +/** + * A test suite for LogicalPlan-to-SQL conversion. + * + * Each query has a golden generated SQL file in test/resources/sqlgen. The test suite also has + * built-in functionality to automatically generate these golden files. + * + * To re-generate golden files, run: + *SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "hive/test-only *LogicalPlanToSQLSuite" + */ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { import testImplicits._ + // Used for generating new query answer files by saving + private val regenerateGoldenFiles = +Option(System.getenv("SPARK_GENERATE_GOLDEN_FILES")).contains("1") --- End diff -- Why did you use contains here? This is super confusing and also broke 2.10. I think I asked to do comparison with Some("1"). In most cases it is a very bad idea to use collection-oriented methods on Options, because they make the code more confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashMapGener...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14174 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62498/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashMapGener...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14174 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashMapGener...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14174 **[Test build #62498 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62498/consoleFull)** for PR 14174 at commit [`b04ff88`](https://github.com/apache/spark/commit/b04ff887ac10ee2f9bdd66c0c83b007521b5016c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14252: [SPARK-16615][SQL] Expose sqlContext in SparkSession
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14252 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62496/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14252: [SPARK-16615][SQL] Expose sqlContext in SparkSession
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14252 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14252: [SPARK-16615][SQL] Expose sqlContext in SparkSession
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14252 **[Test build #62496 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62496/consoleFull)** for PR 14252 at commit [`1f66189`](https://github.com/apache/spark/commit/1f661893b83e182c41d1a57dbdfab85f511765ae). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashMapGener...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14174 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashMapGener...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14174 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62497/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashMapGener...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14174 **[Test build #62497 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62497/consoleFull)** for PR 14174 at commit [`d939488`](https://github.com/apache/spark/commit/d9394888977c97fe95f1642ad9f613dcbee1e4fa). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14204: [SPARK-16520] [WEBUI] Link executors to corresponding wo...
Github user nblintao commented on the issue: https://github.com/apache/spark/pull/14204 @ajbozarth Thanks for pointing out how to fix this. I'll try to fix it tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/14132 First of all, I will update this right now. Second, 10+ rules mean? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14022: [SPARK-16272][core] Allow config values to reference con...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14022 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62494/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14022: [SPARK-16272][core] Allow config values to reference con...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14022 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite to chec...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/14235 Sure. I've been looking that. It's on my list. I'll make a JIRA issue and proceed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14022: [SPARK-16272][core] Allow config values to reference con...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14022 **[Test build #62494 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62494/consoleFull)** for PR 14022 at commit [`ed5c18b`](https://github.com/apache/spark/commit/ed5c18baddbd7ceb4157f5a31bf150d2ef9e7d19). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14132 Is there a better way to handle sql generation that's not adding 10+ rules? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14132 Can you update this now the sql generation pull request has been merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14252: [SPARK-16615][SQL] Expose sqlContext in SparkSession
Github user marmbrus commented on the issue: https://github.com/apache/spark/pull/14252 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite to chec...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14235 @dongjoon-hyun can you also look into having stable identifiers for gen_attr? Right now the golden files look really weird because gen_attr is used more than once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite to chec...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/14235 Oh, thank you for merging, @rxin ! Also, thank you for review, @gatorsmile and @liancheng . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashMapGener...
Github user sameeragarwal commented on the issue: https://github.com/apache/spark/pull/14174 This looks pretty solid! I left some non-critical comments around readability and structuring. I'll take a pass on the test suites once those are addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14235 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71254476 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.aggregate + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, DeclarativeAggregate} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.types._ + +/** + * This is a helper class to generate an append-only row-based hash map that can act as a 'cache' + * for extremely fast key-value lookups while evaluating aggregates (and fall back to the + * `BytesToBytesMap` if a given key isn't found). This is 'codegened' in HashAggregate to speed + * up aggregates w/ key. + * + * We also have VectorizedHashMapGenerator, which generates a append-only vectorized hash map. + * We choose one of the two as the 1st level, fast hash map during aggregation. + * + * NOTE: This row-based hash map currently doesn't support nullable keys and falls back to the + * `BytesToBytesMap` to store them. + */ +class RowBasedHashMapGenerator( --- End diff -- One thing that might be nice to do (perhaps as part of a small followup PR) is to reduce the code duplication between this and the `VectorizedHashMapGenerator` by having these 2 just extend a base class `HashMapGenerator`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14252: [SPARK-16615][SQL] Expose sqlContext in SparkSess...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/14252#discussion_r71254442 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -115,9 +115,11 @@ class SparkSession private( /** * A wrapped version of this session in the form of a [[SQLContext]], for backward compatibility. + * + * @since 2.0.0 --- End diff -- Yea RC4 didn't pass - will kick off rc5. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite to chec...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14235 Thanks - merging in master / 2.0. I'm also merging this in 2.0 since it is a test only change and will reduce merge conflicts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71254165 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71254034 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253992 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark pull request #12944: [SPARK-15074][Shuffle] Cache shuffle index file t...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12944#discussion_r71254024 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +/** + * Contains offset and length of the shuffle block data. + */ +public class ShuffleIndexRecord { + private final long offset; + private final long length; + + public ShuffleIndexRecord(long offset, long length) { +this.offset = offset; +this.length = length; + } + + public long getOffset() { +return offset; + } + + public long getLength() { +return length; + } +} --- End diff -- It seems it was rebased but I guess I meant below: ``` -} +} \ No newline at end of file ``` meaning ![2016-07-19 9 11 19](https://cloud.githubusercontent.com/assets/6477701/16934391/d5711740-4d90-11e6-9012-f604747ad4d2.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253933 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253903 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253796 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark issue #14036: [SPARK-16323] [SQL] Add IntegerDivide to avoid unnecessa...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/14036 @techaddict Can you test the performance with and without your change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user ooq commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253773 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +if (numRow
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14251 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62495/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253765 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14251 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14251 **[Test build #62495 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62495/consoleFull)** for PR 14251 at commit [`e3ed851`](https://github.com/apache/spark/commit/e3ed851f5c57970342d665f7087e1f0743d92f70). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253615 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253373 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, + Object vbase, long voff, int vlen) { +final long recordLength = 8 + klen + vlen + 8; +// if run out of max supported rows or page size, return null +
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253434 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; + } + + public UnsafeRow appendRow(Object kbase, long koff, int klen, --- End diff -- Can we add a small comment saying that this returns the value row? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as wel
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253283 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; +} +currentAndOnlyBase = currentAndOnlyPage.getBaseObject(); +Platform.putInt(currentAndOnlyBase, currentAndOnlyPage.getBaseOffset(), 0); +pageCursor = 4; +recordStartOffset = pageCursor + currentAndOnlyPage.getBaseOffset(); + +return true; + } + + private long getKeyOffsetForFixedLengthRecords(int rowId) { +return recordStartOffset + rowId * recordLength + 8; --- End diff -- Let's explicitly cast `recordLength` as long to prevent inadvertant overflows while multiplying 2 integers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this featu
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253225 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { +try { + currentAndOnlyPage = allocatePage(required); +} catch (OutOfMemoryError e) { + return false; --- End diff -- It's also log the exception here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253164 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; + private Object currentAndOnlyBase = null; + private long recordStartOffset; + private long pageCursor = 0; + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, DEFAULT_CAPACITY, manager); + } + + public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema, + TaskMemoryManager manager, int maxRows) { +return new RowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager); + } + + public int numRows() { return numRows; } + + public void close() { +if (currentAndOnlyPage != null) { + freePage(currentAndOnlyPage); + currentAndOnlyPage = null; +} + } + + private boolean acquireNewPage(long required) { --- End diff -- nit: `requiredSize` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253124 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ + private static final int DEFAULT_CAPACITY = 1 << 16; + private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; + + private final StructType keySchema; + private final StructType valueSchema; + private final int capacity; + private int numRows = 0; + + // Staging row returned from getRow. + final UnsafeRow keyRow; + final UnsafeRow valueRow; + + // ids for current key row and value row being retrieved + private int keyRowId = -1; + + // full addresses for key rows and value rows + private long[] keyOffsets; + + // if all data types in the schema are fixed length + private boolean allFixedLength; + private int klen; + private int vlen; + private int recordLength; + + private MemoryBlock currentAndOnlyPage = null; --- End diff -- might be just okay to call them `page` and `base` for brevity. We should make it clear in the class docs about why that's the current/only page --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12944: [SPARK-15074][Shuffle] Cache shuffle index file t...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/12944#discussion_r71253134 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexCache.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; +import java.util.concurrent.ExecutionException; + +/** + * Maintains an LRU cache of {@link ShuffleIndexInformation} so that + * we can avoid open/close of the index files for each block fetch. + */ +public class ShuffleIndexCache { + + LoadingCache indexCache; + + public ShuffleIndexCache(long cacheSize) { +CacheLoader loader = --- End diff -- nit: does it work to have `File` as a key? That seems cleaner than a raw string. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13950: [SPARK-15487] [Web UI] Spark Master UI to reverse proxy ...
Github user ajbozarth commented on the issue: https://github.com/apache/spark/pull/13950 I'll try and take a look at this tomorrow --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71253029 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. + * + * TODO: making each entry more compact, e.g., combine key and value into a single UnsafeRow + */ +public final class RowBasedKeyValueBatch extends MemoryConsumer{ --- End diff -- nit: `MemoryConsumer {` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71252969 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 + * + * RowBasedKeyValueBatch will automatically acquire new pages (MemoryBlock) when the current page + * is used up. --- End diff -- Let's update/add docs about why this data structure is backed by a single page. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13670: [SPARK-15951] Change Executors Page to use datatables to...
Github user ajbozarth commented on the issue: https://github.com/apache/spark/pull/13670 Sorry I couldn't get to this today, I'll do my best to take a look in the morning --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...
Github user ajbozarth commented on the issue: https://github.com/apache/spark/pull/14158 I'll try to take a look at this tomorrow --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14174: [SPARK-16524][SQL] Add RowBatch and RowBasedHashM...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14174#discussion_r71252862 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; + + +/** + * RowBasedKeyValueBatch stores key value pairs in contiguous memory region. + * + * Each key or value is stored as a single UnsafeRow. The format for each record looks like this: + * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] + * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] + * [8 bytes pointer to next] + * Thus, record length = 8 + klen + vlen + 8 --- End diff -- nit: record length = 4 + 4 + klen + vlen + 8 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14204: [SPARK-16520] [WEBUI] Link executors to corresponding wo...
Github user ajbozarth commented on the issue: https://github.com/apache/spark/pull/14204 I'll try to take a look at this tomorrow, but you're failing MiMa tests because you added a param to `ExecutorInfo` which is part of the developer api. You'll have to add an exclude to `MimaExcludes` for `ExecutorInfo` to pass MiMa. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12944: [SPARK-15074][Shuffle] Cache shuffle index file t...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/12944#discussion_r71251461 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexCache.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; +import java.util.concurrent.ExecutionException; + +/** + * Maintains an LRU cache of {@link ShuffleIndexInformation} so that + * we can avoid open/close of the index files for each block fetch. + */ +public class ShuffleIndexCache { --- End diff -- Can we just inline this cache into ExternalShuffleBlockResolver? It doesn't seem necessary to add a new class file that just wraps the guava cache. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12944: [SPARK-15074][Shuffle] Cache shuffle index file t...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/12944#discussion_r71251332 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.nio.ch.IOUtil; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; + +/** + * Keeps the index information for a particular map output + * as an in-memory LongBuffer. + */ +public class ShuffleIndexInformation { + /** offsets as long buffer */ + private final LongBuffer offsets; + + public ShuffleIndexInformation(File indexFile) throws IOException{ --- End diff -- nit: space before the { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12944: [SPARK-15074][Shuffle] Cache shuffle index file t...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/12944#discussion_r71251280 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexCache.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; +import java.util.concurrent.ExecutionException; + +/** + * Maintains an LRU cache of {@link ShuffleIndexInformation} so that + * we can avoid open/close of the index files for each block fetch. + */ +public class ShuffleIndexCache { + + LoadingCache indexCache; + + public ShuffleIndexCache(long cacheSize) { +CacheLoader loader = + new CacheLoader() { +public ShuffleIndexInformation load(String file) throws IOException { + return new ShuffleIndexInformation(new File(file)); +} + }; +indexCache = CacheBuilder.newBuilder() +.maximumSize(cacheSize).build(loader); + --- End diff -- nit: extra newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12944: [SPARK-15074][Shuffle] Cache shuffle index file t...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/12944#discussion_r71251245 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -66,6 +67,16 @@ @VisibleForTesting final ConcurrentMap executors; + /** + * Caches index file information so that we can avoid open/close the index files + * for each block fetch. + */ + private final ShuffleIndexCache shuffleIndexCache; + + // Max number of entries to keep in the index cache. + private static final String SPARK_SHUFFLE_SERVICE_INDEX_CACHE_ENTRIES = "spark.shuffle.service.index.cache.entries"; + private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_INDEX_CACHE_ENTRIES = 1024; --- End diff -- This conf seems a little out of place. Can it be co-located with the other spark.shuffle.* configs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12944: [SPARK-15074][Shuffle] Cache shuffle index file t...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/12944#discussion_r71251177 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +/** + * Contains offset and length of the shuffle block data. + */ +public class ShuffleIndexRecord { + private final long offset; + private final long length; + + public ShuffleIndexRecord(long offset, long length) { +this.offset = offset; +this.length = length; + } + + public long getOffset() { +return offset; + } + + public long getLength() { +return length; + } +} --- End diff -- The convention seems to be to not have newlines at the end of files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12944: [SPARK-15074][Shuffle] Cache shuffle index file t...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/12944#discussion_r71251123 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.nio.ch.IOUtil; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; + +/** + * Keeps the index information for a particular map output + * as an in-memory LongBuffer. + */ +public class ShuffleIndexInformation { + /** offsets as long buffer */ + private final LongBuffer offsets; + + public ShuffleIndexInformation(File indexFile) throws IOException{ +int size = (int)indexFile.length(); +ByteBuffer buffer = ByteBuffer.allocate(size); +DataInputStream dis = new DataInputStream(new FileInputStream(indexFile)); +dis.readFully(buffer.array()); +dis.close(); --- End diff -- close() in finally block? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/14251 Ah. I see. I missed that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14132 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62491/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14132 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14132: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14132 **[Test build #62491 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62491/consoleFull)** for PR 14132 at commit [`e6a44ed`](https://github.com/apache/spark/commit/e6a44ed3d97692f033ea4b1832b08527e6157aa2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14251 @dongjoon-hyun again we cannot change the type checking for coalesce. Let's not confuse analyzer level type coercion with type checking in expressions for resolution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite to chec...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14235 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite to chec...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14235 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62493/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13986: [SPARK-16617] Upgrade to Avro 1.8.1
Github user benmccann commented on the issue: https://github.com/apache/spark/pull/13986 >At least run the script that updates the deps/ files to see that impact Can you point me to the script you're referring to? > Figure out what the upside is in updating to 1.8 The reason I'm proposing this change is that Avro 1.8 makes the generated classes serializable so that you can create RDDs containing Avro objects. > Note any potential incompatibilities here Here's the list of incompatible changes from the [changelog](https://github.com/apache/avro/blob/master/CHANGES.txt): * AVRO-1334. Java: Update versions of many dependencies. (scottcarey, cutting) * AVRO-997. Java: For enum values, no longer sometimes permit any Object whose toString() names an enum symbol, but rather always require use of distinct enum types. (Sean Busbey via cutting) * AVRO-1602. Java: Remove Dapper-style RPC trace facility. This seems unused and has been a source of build problems. (cutting) * AVRO-1586. Build against Hadoop 2. With this change the avro-mapred and trevni-avro JARs without a hadoop1 or hadoop2 Maven classifier are Hadoop 2 artifacts. To use with Hadoop 1, set the classifier to hadoop1. (tomwhite) * AVRO-1502. Java: Generated classes now implement Serializable. Generated classes need to be regenerated to use this release. (cutting) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14235: [SPARK-16590][SQL] Improve LogicalPlanToSQLSuite to chec...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14235 **[Test build #62493 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62493/consoleFull)** for PR 14235 at commit [`efaa4d0`](https://github.com/apache/spark/commit/efaa4d0d55373280e19ed38b7e192545e4a3a6af). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14251: [SPARK-16602][SQL] `Nvl` function should support ...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/14251#discussion_r71247244 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2965,4 +2965,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-16602 Nvl/Coalesce") { +// NVL uses the first parameter data type. +checkAnswer(sql("select nvl('0', 1)"), Row("0")) +checkAnswer(sql("select nvl(0, '1')"), Row(0)) +checkAnswer(sql("select nvl(null, '1')"), Row("1")) +checkAnswer(sql("select nvl(null, 1.1)"), Row(1.1)) + +// Coalesce do TypeCoercion to make them homogeneous. +checkAnswer(sql("select coalesce('0', 1)"), Row("0")) +checkAnswer(sql("select coalesce(0, '1')"), Row("0")) +checkAnswer(sql("select coalesce(null, '1')"), Row("1")) +checkAnswer(sql("select coalesce(null, 1.1)"), Row(1.1)) +checkAnswer(sql("select coalesce(null, 1.1, '1')"), Row("1.1")) --- End diff -- @rxin . Here, I added the comparison tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14250: [SPARKR][DOCS] minor code sample update in R prog...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14250 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14251: [SPARK-16602][SQL] `Nvl` function should support various...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14251 **[Test build #62499 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62499/consoleFull)** for PR 14251 at commit [`85e3144`](https://github.com/apache/spark/commit/85e31447c16c4879ba0149533f21b1e57ffe3186). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org