[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19424 **[Test build #82444 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82444/testReport)** for PR 19424 at commit [`75457f6`](https://github.com/apache/spark/commit/75457f608b9068ad1b3b3eb129f14d4e1b4ed946). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142592915 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} + +case class FlatMapGroupsInPandasExec( +groupingAttributes: Seq[Attribute], +func: Expression, +output: Seq[Attribute], +child: SparkPlan) + extends UnaryExecNode { + + private val pandasFunction = func.asInstanceOf[PythonUDF].func + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(output) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) +val argOffsets = Array((0 until child.schema.length).toArray) + +inputRDD.mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val context = TaskContext.get() + + val columnarBatchIter = new ArrowPythonRunner( +chainedFunc, bufferSize, reuseWorker, +PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema) +.compute(grouped.map(_._2), context.partitionId(), context) + + val rowIter = new Iterator[InternalRow] { +private var currentIter = if (columnarBatchIter.hasNext) { + val batch = columnarBatchIter.next() + batch.rowIterator.asScala --- End diff -- Don't we need to check the returned schema? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/19424 [SPARK-22197][SQL] push down operators to data source before planning ## What changes were proposed in this pull request? As we discussed in https://github.com/apache/spark/pull/19136#discussion_r137023744 , we should push down operators to data source before planning, so that data source can report statistics more accurate. ## How was this patch tested? existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark follow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19424.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19424 commit 571fad95c2dac6f70840da9c68cfb1be1358ecaf Author: Wenchen Fan Date: 2017-09-27T12:50:15Z improve documents and minor clean up commit 75457f608b9068ad1b3b3eb129f14d4e1b4ed946 Author: Wenchen Fan Date: 2017-10-04T01:37:25Z push down operators to data source before planning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19424 cc @rdblue @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17455: [Spark-20044][Web UI] Support Spark UI behind front-end ...
Github user okoethibm commented on the issue: https://github.com/apache/spark/pull/17455 @jiangxb1987 I've stopped working on Spark related efforts a while ago and will not be following up on this PR any more. Anyone interested is free to take over. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19416#discussion_r142583033 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow} +import org.apache.spark.sql.execution.ObjectOperator +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP +import org.apache.spark.sql.types.{IntegerType, LongType, StructType} + + +class FlatMapGroupsWithState_StateManager( +stateEncoder: ExpressionEncoder[Any], +shouldStoreTimestamp: Boolean) extends Serializable { + + val stateSchema = { +val schema = new StructType().add("groupState", stateEncoder.schema, nullable = true) +if (shouldStoreTimestamp) schema.add("timeoutTimestamp", LongType) else schema + } + + def getState(store: StateStore, keyRow: UnsafeRow): FlatMapGroupsWithState_StateData = { +val stateRow = store.get(keyRow) +stateDataForGets.withNew( + keyRow, stateRow, getStateObj(stateRow), getTimestamp(stateRow)) + } + + def putState(store: StateStore, keyRow: UnsafeRow, state: Any, timestamp: Long): Unit = { +val stateRow = getStateRow(state) +setTimestamp(stateRow, timestamp) +store.put(keyRow, stateRow) + } + + def removeState(store: StateStore, keyRow: UnsafeRow): Unit = { +store.remove(keyRow) + } + + def getAllState(store: StateStore): Iterator[FlatMapGroupsWithState_StateData] = { +val stateDataForGetAllState = FlatMapGroupsWithState_StateData() +store.getRange(None, None).map { pair => + stateDataForGetAllState.withNew( +pair.key, pair.value, getStateObjFromRow(pair.value), getTimestamp(pair.value)) +} + } + + private val stateAttributes: Seq[Attribute] = stateSchema.toAttributes + + // Get the serializer for the state, taking into account whether we need to save timestamps + private val stateSerializer = { +val nestedStateExpr = CreateNamedStruct( + stateEncoder.namedExpressions.flatMap(e => Seq(Literal(e.name), e))) +if (shouldStoreTimestamp) { + Seq(nestedStateExpr, Literal(GroupStateImpl.NO_TIMESTAMP)) +} else { + Seq(nestedStateExpr) +} + } + + // Get the deserializer for the state. Note that this must be done in the driver, as + // resolving and binding of deserializer expressions to the encoded type can be safely done + // only in the driver. + private val stateDeserializer = { +val boundRefToNestedState = BoundReference(nestedStateOrdinal, stateEncoder.schema, true) +val deser = stateEncoder.resolveAndBind().deserializer.transformUp { + case BoundReference(ordinal, _, _) => GetStructField(boundRefToNestedState, ordinal) +} +CaseWhen(Seq(IsNull(boundRefToNestedState) -> Literal(null)), elseValue = deser).toCodegen() + } + + private lazy val nestedStateOrdinal = 0 + private lazy val timeoutTimestampOrdinal = 1 + + // Converters for translating state between rows and Java objects + private lazy val getStateObjFromRow = ObjectOperator.deserializeRowToObject( +stateDeserializer, stateAttributes) + private lazy val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer) + + private lazy val stateDataForGets = FlatMapGroupsWithState_StateData() + + /** Returns the state as Java object if defined */ + private def getStateObj(stateRow: UnsafeRow): Any = { +i
[GitHub] spark pull request #19422: [SPARK-22193][SQL] Minor typo fix
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19422 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19422 Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142583949 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP --- End diff -- And two spaces before the inlined comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142583906 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +63,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) --- End diff -- Ah, I see. Then please see my comment at https://github.com/apache/spark/pull/18732#discussion_r142578554. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19404 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19404 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82439/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19404 **[Test build #82439 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82439/testReport)** for PR 19404 at commit [`a2d5bc7`](https://github.com/apache/spark/commit/a2d5bc706987accaeb6c9516e7d4a07b5bb3104f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142583590 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) --- End diff -- A good catch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142583338 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +63,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) --- End diff -- This is actually my first implementation. However it turns out I cannot hold reference to input rows without copy so `grouped` doesn't work (grouped uses a ArrayBuffer to keep references) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19083 **[Test build #82443 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82443/testReport)** for PR 19083 at commit [`09ae105`](https://github.com/apache/spark/commit/09ae105c101a1b31d2a8873976c01590c50411d2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18732 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82440/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18732 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18732 **[Test build #82440 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82440/testReport)** for PR 18732 at commit [`21fed0d`](https://github.com/apache/spark/commit/21fed0dfeefe5775d193d7bb4176c38f0c2b91eb). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142582571 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -380,23 +380,26 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() -if (ctx.isTooLongGeneratedFunction) { - logWarning("Found too long generated codes and JIT optimization might not work, " + -"Whole-stage codegen disabled for this plan, " + -"You can change the config spark.sql.codegen.MaxFunctionLength " + -"to adjust the function length limit:\n " -+ s"$treeString") - return child.execute() -} // try to compile and fallback if it failed -try { +val (_, maxCodeSize) = try { CodeGenerator.compile(cleanedSource) } catch { case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() } + +// Check if compiled code has a too large function +if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { + logWarning(s"Found too long generated codes and JIT optimization might not work: " + +s"the bytecode size was $maxCodeSize, this value went over the limit " + +s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disable " + +s"for this plan. To avoid this, you can set the limit " + --- End diff -- `set ` -> `raise `. then, remove `higher` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142582458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -380,23 +380,26 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() -if (ctx.isTooLongGeneratedFunction) { - logWarning("Found too long generated codes and JIT optimization might not work, " + -"Whole-stage codegen disabled for this plan, " + -"You can change the config spark.sql.codegen.MaxFunctionLength " + -"to adjust the function length limit:\n " -+ s"$treeString") - return child.execute() -} // try to compile and fallback if it failed -try { +val (_, maxCodeSize) = try { CodeGenerator.compile(cleanedSource) } catch { case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() } + +// Check if compiled code has a too large function +if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { + logWarning(s"Found too long generated codes and JIT optimization might not work: " + +s"the bytecode size was $maxCodeSize, this value went over the limit " + +s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disable " + --- End diff -- `disable ` -> `disabled ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142581225 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP --- End diff -- `# doctest: +SKIP`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18732 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82438/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18732 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18732 **[Test build #82438 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82438/testReport)** for PR 18732 at commit [`4943ceb`](https://github.com/apache/spark/commit/4943ceb8b57041a7bf911e8f8637380c5fc263ff). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19327 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/18931 ping @gatorsmile @cloud-fan for review. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/17819 @gatorsmile The SQL change looks good to you? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.
Github user tdas commented on the issue: https://github.com/apache/spark/pull/19327 LGTM. Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142579512 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( +groupingAttributes: Seq[Attribute], +functionExpr: Expression, +output: Seq[Attribute], +child: LogicalPlan) extends UnaryNode { + /** + * This is needed because output attributes is considered `reference` when + * passed through the constructor. + * + * Without this, catalyst will complain that output attributes are missing + * from the input. + */ + override val producedAttributes = AttributeSet(output) --- End diff -- Not quite sure I understand this correctly. Why `output` is considered into `references`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19083 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19083 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82437/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19083 **[Test build #82437 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82437/testReport)** for PR 19083 at commit [`fca22b7`](https://github.com/apache/spark/commit/fca22b767fddb061303cddd4e06c87130b1b32dc). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19395: [SPARK-22171] [SQL] Describe Table Extended Failed when ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19395 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19395: [SPARK-22171] [SQL] Describe Table Extended Faile...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19395 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19083 **[Test build #82442 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82442/testReport)** for PR 19083 at commit [`433f13b`](https://github.com/apache/spark/commit/433f13b03e995bbb47641b44ed1f7961cc4ea2ec). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...
Github user krishna-pandey commented on a diff in the pull request: https://github.com/apache/spark/pull/19419#discussion_r142578623 --- Diff: core/src/main/scala/org/apache/spark/ui/JettyUtils.scala --- @@ -89,6 +92,9 @@ private[spark] object JettyUtils extends Logging { val result = servletParams.responder(request) response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate") response.setHeader("X-Frame-Options", xFrameOptionsValue) +response.setHeader("X-XSS-Protection", xXssProtectionValue.get) --- End diff -- @srowen Added the check for if Option exists then set and tested locally. Thanks for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19419: [SPARK-22188] [CORE] Adding security headers for ...
Github user krishna-pandey commented on a diff in the pull request: https://github.com/apache/spark/pull/19419#discussion_r142578622 --- Diff: core/src/main/scala/org/apache/spark/ui/JettyUtils.scala --- @@ -89,6 +92,9 @@ private[spark] object JettyUtils extends Logging { val result = servletParams.responder(request) response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate") response.setHeader("X-Frame-Options", xFrameOptionsValue) +response.setHeader("X-XSS-Protection", xXssProtectionValue.get) --- End diff -- @srowen Added the check for if Option exists then set and tested locally. Thanks for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142578625 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) --- End diff -- `return GroupedData(jgd, self._df)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142578554 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private class BatchIterator[T](iter: Iterator[T], batchSize: Int) + extends Iterator[Iterator[T]] { + + override def hasNext: Boolean = iter.hasNext + + override def next(): Iterator[T] = { --- End diff -- If we really don't want to use `GroupedIterator` as suggested in https://github.com/apache/spark/pull/18732/files#r142577976, we should add a clear warning comment on it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19423: Branch 2.2
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19423 @engineeyao, close this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142578363 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -111,6 +111,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { } def apply(plan: SparkPlan): SparkPlan = plan transformUp { +// FlatMapGroupsInPandas and be evaluated in python worker --- End diff -- The comment seems to be wrong wording. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15666: [SPARK-11421] [Core][Python][R] Added ability for addJar...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/15666 Thanks for asking this. I completely forgot this one. Will try to make some time to take a look within few days. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142577976 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +63,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) --- End diff -- Should we simply use `iter.grouped(batchSize).map(_.toIterator)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142577791 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private class BatchIterator[T](iter: Iterator[T], batchSize: Int) + extends Iterator[Iterator[T]] { + + override def hasNext: Boolean = iter.hasNext + + override def next(): Iterator[T] = { --- End diff -- The behavior of this `BatchIterator` seems to be a bit weird and seems to not follow `Iterator`. For example, you can continue call its `next()` and never consume it to the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142577265 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -151,7 +151,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { } } - def genGroupByCodeGenContext(caseNum: Int): CodegenContext = { + def genGroupByCodeGenContext(caseNum: Int): CodeAndComment = { --- End diff -- `genGroupByCodeGenContext` -> `genGroupByCode`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142577237 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -17,10 +17,10 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{Column, Dataset, Row} -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack} -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import java.util.concurrent.ExecutionException + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodegenContext, CodeGenerator} --- End diff -- We don't use `CodegenContext` anymore and can remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142577111 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -333,33 +334,28 @@ class AggregateBenchmark extends BenchmarkBase { .sum() .collect() -benchmark.addCase(s"codegen = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") +benchmark.addCase(s"hugeMethodLimit = 8000") { iter => + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "8000") f() } -benchmark.addCase(s"codegen = T maxLinesPerFunction = 1") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1") - f() -} - -benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1500") +benchmark.addCase(s"hugeMethodLimit = 16000") { iter => + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "16000") f() } benchmark.run() /* -Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1 -Intel64 Family 6 Model 58 Stepping 9, GenuineIntel -max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - -- -codegen = F462 / 533 1.4 704.4 1.0X -codegen = T maxLinesPerFunction = 1 3444 / 3447 0.2 5255.3 0.1X -codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X +Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 +Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + +max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +hugeMethodLimit = 80001043 / 1159 0.6 1591.5 1.0X --- End diff -- yea, you're right; I'll update and thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142576983 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala --- @@ -333,33 +334,28 @@ class AggregateBenchmark extends BenchmarkBase { .sum() .collect() -benchmark.addCase(s"codegen = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") +benchmark.addCase(s"hugeMethodLimit = 8000") { iter => + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "8000") f() } -benchmark.addCase(s"codegen = T maxLinesPerFunction = 1") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1") - f() -} - -benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1500") +benchmark.addCase(s"hugeMethodLimit = 16000") { iter => + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "16000") f() } benchmark.run() /* -Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1 -Intel64 Family 6 Model 58 Stepping 9, GenuineIntel -max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - -- -codegen = F462 / 533 1.4 704.4 1.0X -codegen = T maxLinesPerFunction = 1 3444 / 3447 0.2 5255.3 0.1X -codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X +Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 +Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + +max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +hugeMethodLimit = 80001043 / 1159 0.6 1591.5 1.0X --- End diff -- The original `codegen = F` case is removed? I think it is reasonable to compare with it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142576602 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -203,6 +203,10 @@ package object config { private[spark] val HISTORY_UI_MAX_APPS = ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE) + private[spark] val UI_SHOW_CONSOLE_PROGRESS = ConfigBuilder("spark.ui.showConsoleProgress") --- End diff -- While updating the PR, I noticed that `internal/config/package.scala` looks more natural for this. Like [spark.ui.retainedTasks](https://github.com/apache/spark/pull/19061/files#diff-6bdad48cfc34314e89599655442ff210R198), I added here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19083 fixed @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19061: [SPARK-21568][CORE] ConsoleProgressBar should only be en...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19061 **[Test build #82441 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82441/testReport)** for PR 19061 at commit [`e2158ad`](https://github.com/apache/spark/commit/e2158ad13a5ff185bbe9dd26e4c00bc75db9dcfb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571627 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None --- End diff -- indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142572013 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt --- End diff -- Instead of batchSize, I think we may use nonEmptyDocsN directly. @jkbradley please double check since it will be a behavior change. Also please notice that nonEmptyDocsN can be zero, so be careful for the divide by 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142574222 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) +logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) + +expElogbetaBc.destroy(false) + this } /** - * Update lambda based on the batch submitted. batchSize can be different for each iteration. + * Update lambda based on the batch submitted. nonEmptyDocsN can be different for each iteration. --- End diff -- comments should be consistent with code. Update code or revert comment change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142574453 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats + .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))( +elementWiseSum, elementWiseSum + ) +val batchResult = statsSum *:* expElogbeta.t // Note that this is an optimization to avoid batch.count -updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt) -if (optimizeDocConcentration) updateAlpha(gammat) +val batchSize = (miniBatchFraction * corpusSize).ceil.toInt +updateLambda(batchResult, batchSize) + +logphatOption.foreach(_ /= batchSize.toDouble) --- End diff -- Should use nonEmptyDocsN to be consistent with original implementation, also avoid divide by 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571342 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration --- End diff -- If it's only used once, reassign to local variable is not necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571728 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats --- End diff -- extra space after nonEmptyDocsN --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571603 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration --- End diff -- The comment is not that accurate. If `optimizeDocConcentration==false`, logphat will not be calculated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r142571685 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount : Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() -expElogbetaBc.destroy(false) -val batchResult = statsSum *:* expElogbeta.t + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long), + v : (BDM[Double], Option[BDV[Double]], Long)) => { --- End diff -- indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19327 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19327 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82436/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82436 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82436/testReport)** for PR 19327 at commit [`6d06374`](https://github.com/apache/spark/commit/6d063745a1501277018b04ef264e1e72949dc107). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18732 **[Test build #82440 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82440/testReport)** for PR 18732 at commit [`21fed0d`](https://github.com/apache/spark/commit/21fed0dfeefe5775d193d7bb4176c38f0c2b91eb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142572643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +63,22 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch + +val batchIter = if (batchSize > 0) { + new BatchIterator(iter, batchSize) +} else { + Iterator(iter) +} --- End diff -- I changed to one line. I think more concise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142572356 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType */ @InterfaceStability.Stable class RelationalGroupedDataset protected[sql]( -df: DataFrame, +val df: DataFrame, --- End diff -- Reverted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19404 **[Test build #82439 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82439/testReport)** for PR 19404 at commit [`a2d5bc7`](https://github.com/apache/spark/commit/a2d5bc706987accaeb6c9516e7d4a07b5bb3104f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19423: Branch 2.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19423 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571653 --- Diff: python/pyspark/worker.py --- @@ -74,17 +75,37 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +import pyarrow as pa --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571660 --- Diff: python/pyspark/worker.py --- @@ -74,17 +75,37 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +import pyarrow as pa + +arrow_return_types = list(to_arrow_type(field.dataType) for field in return_type) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19083 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19083 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82435/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19423: Branch 2.2
GitHub user engineeyao opened a pull request: https://github.com/apache/spark/pull/19423 Branch 2.2 ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/spark branch-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19423.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19423 commit 9cbf39f1c74f16483865cd93d6ffc3c521e878a7 Author: Yanbo Liang Date: 2017-05-25T12:15:15Z [SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth. ## What changes were proposed in this pull request? Follow-up for #17218, some minor fix for PySpark ```FPGrowth```. ## How was this patch tested? Existing UT. Author: Yanbo Liang Closes #18089 from yanboliang/spark-19281. (cherry picked from commit 913a6bfe4b0eb6b80a03b858ab4b2767194103de) Signed-off-by: Yanbo Liang commit e01f1f222bcb7c469b1e1595e9338ed478d99894 Author: Yan Facai (é¢åæ) Date: 2017-05-25T13:40:39Z [SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark FPGrowth. ## What changes were proposed in this pull request? Expose numPartitions (expert) param of PySpark FPGrowth. ## How was this patch tested? + [x] Pass all unit tests. Author: Yan Facai (é¢åæ) Closes #18058 from facaiy/ENH/pyspark_fpg_add_num_partition. (cherry picked from commit 139da116f130ed21481d3e9bdee5df4b8d7760ac) Signed-off-by: Yanbo Liang commit 022a4957d8dc8d6049e0a8c9191fcfd1bd95a4a4 Author: Lior Regev Date: 2017-05-25T16:08:19Z [SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by SparkSubmit ## What changes were proposed in this pull request? Deleted generated JARs archive after distribution to HDFS ## How was this patch tested? Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Lior Regev Closes #17986 from liorregev/master. (cherry picked from commit 7306d556903c832984c7f34f1e8fe738a4b2343c) Signed-off-by: Sean Owen commit 5ae1c652147aba9c5087335b0c6916a1035090b2 Author: hyukjinkwon Date: 2017-05-25T16:10:30Z [SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path check for sc.addJar on Windows ## What changes were proposed in this pull request? This PR proposes two things: - A follow up for SPARK-19707 (Improving the invalid path check for sc.addJar on Windows as well). ``` org.apache.spark.SparkContextSuite: - add jar with invalid path *** FAILED *** (32 milliseconds) 2 was not equal to 1 (SparkContextSuite.scala:309) ... ``` - Fix path vs URI related test failures on Windows. ``` org.apache.spark.storage.LocalDirsSuite: - SPARK_LOCAL_DIRS override also affects driver *** FAILED *** (0 milliseconds) new java.io.File("/NONEXISTENT_PATH").exists() was true (LocalDirsSuite.scala:50) ... - Utils.getLocalDir() throws an exception if any temporary directory cannot be retrieved *** FAILED *** (15 milliseconds) Expected exception java.io.IOException to be thrown, but no exception was thrown. (LocalDirsSuite.scala:64) ... ``` ``` org.apache.spark.sql.hive.HiveSchemaInferenceSuite: - orc: schema should be inferred and saved when INFER_AND_SAVE is specified *** FAILED *** (203 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-dae61ab3-a851-4dd3-bf4e-be97c501f254 ... - parquet: schema should be inferred and saved when INFER_AND_SAVE is specified *** FAILED *** (203 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-fa3aff89-a66e-4376-9a37-2a9b87596939 ... - orc: schema should be inferred but not stored when INFER_ONLY is specified *** FAILED *** (141 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-fb464e59-b049-481b-9c75-f53295c9fc2c ... - parquet: schema should be inferred but not stored when INFER_ONLY is specified *** FAILED *** (125 millis
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19083 **[Test build #82435 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82435/testReport)** for PR 19083 at commit [`dfde49b`](https://github.com/apache/spark/commit/dfde49bcc487ecbc0135cd301e8d9c3ad17921be). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142571432 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { --- End diff -- Thanks! Then, I'll create a `deploy/config.scala` and put it there. The followings are the existing `config.scala` files, but it seems to be irrelevant to `deploy/SparkSubmit.scala`. ``` $ find . -name config.scala ./core/src/main/scala/org/apache/spark/deploy/history/config.scala ./resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala ./resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571075 --- Diff: python/pyspark/worker.py --- @@ -32,8 +32,9 @@ from pyspark.serializers import write_with_length, write_int, read_long, \ write_long, read_int, SpecialLengths, PythonEvalType, UTF8Deserializer, PickleSerializer, \ BatchedSerializer, ArrowStreamPandasSerializer -from pyspark.sql.types import toArrowType +from pyspark.sql.types import to_arrow_type from pyspark import shuffle +from pyspark.sql.types import StructType --- End diff -- Good catch. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571047 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +foo_udf = pandas_udf( +foo, +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571038 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +foo_udf = pandas_udf( +foo, +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v=df.v + 1) +return ret + +@pandas_udf(StructType([StructField('id', LongType()), StructField('v', DoubleType())])) +def foo(df): --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571056 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,133 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142570731 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { +sysProps("spark.ui.showConsoleProgress") = "true" + } +} else { + sysProps("spark.ui.showConsoleProgress") = "false" --- End diff -- Yep. I see. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142570436 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { +sysProps("spark.ui.showConsoleProgress") = "true" + } +} else { + sysProps("spark.ui.showConsoleProgress") = "false" --- End diff -- I don't see a need to prevent users from forcing this configuration if they want to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142570378 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { --- End diff -- It should be added to the config package object like all other constants in core. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18732 **[Test build #82438 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82438/testReport)** for PR 18732 at commit [`4943ceb`](https://github.com/apache/spark/commit/4943ceb8b57041a7bf911e8f8637380c5fc263ff). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142570193 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { +sysProps("spark.ui.showConsoleProgress") = "true" + } +} else { + sysProps("spark.ui.showConsoleProgress") = "false" --- End diff -- Originally, I prefer blocking it from here. Please comment me back if this should be removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142570065 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { +sysProps("spark.ui.showConsoleProgress") = "true" + } +} else { + sysProps("spark.ui.showConsoleProgress") = "false" --- End diff -- Or, yes. It can be removed because `SparkContext` also accepts user's configuration. In that case, you're right. This is not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142569400 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { --- End diff -- Ur, @vanzin . It seems there is no pre-defined config for `spark.ui.XXX`. Is it a convention for Apache Spark? Which class do you prefer to add a new config? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142569029 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { +sysProps("spark.ui.showConsoleProgress") = "true" + } +} else { + sysProps("spark.ui.showConsoleProgress") = "false" --- End diff -- This'll prevent user's mistakes by overriding user's `spark.ui.showConsoleProgress=true` with `SparkSubmit` CLI. In case of `SparkSubmit`, only shells will accept user's `spark.ui.showConsoleProgress`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142568710 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { --- End diff -- Thank you, @vanzin . I'll create a new config for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19083 **[Test build #82437 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82437/testReport)** for PR 19083 at commit [`fca22b7`](https://github.com/apache/spark/commit/fca22b767fddb061303cddd4e06c87130b1b32dc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19083 Thanks, I'll update soon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19416#discussion_r142562531 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow} +import org.apache.spark.sql.execution.ObjectOperator +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP +import org.apache.spark.sql.types.{IntegerType, LongType, StructType} + + +class FlatMapGroupsWithState_StateManager( --- End diff -- Yes it is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142561980 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. --- End diff -- nvm, I think it wrongly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142561534 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. --- End diff -- Can the returned `pandas.DataFrame` be arbitrary length? Since we apply it on a `GroupedData`, I think this `apply` should work like aggregation as `count`, `avg` and returned `pandas.DataFrame` should be 1 length? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/18924 Oh, sorry for that, it should waiting @jkbradley to merge it. Don't worry, I will contact him! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18801: SPARK-10878 Fix race condition when multiple clients res...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/18801 gentle ping @Victsm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82436 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82436/testReport)** for PR 19327 at commit [`6d06374`](https://github.com/apache/spark/commit/6d063745a1501277018b04ef264e1e72949dc107). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19083 Few minor comments otherwise LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19404 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19404 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82434/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19404 **[Test build #82434 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82434/testReport)** for PR 19404 at commit [`f945f39`](https://github.com/apache/spark/commit/f945f39438c6a1cabff3f18489dd2082c4cba24c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org