[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21536 **[Test build #91746 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91746/testReport)** for PR 21536 at commit [`2ea2181`](https://github.com/apache/spark/commit/2ea2181697038dbd2109f2daeb347d98724b93af). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21536 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21536 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3966/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21428 **[Test build #91745 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91745/testReport)** for PR 21428 at commit [`4bbdeae`](https://github.com/apache/spark/commit/4bbdeae3b9d7d1955593dab360422dd59800d54f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21536 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/76/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21536 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r194963031 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -183,34 +182,131 @@ private[sql] object ArrowConverters { } /** - * Convert a byte array to an ArrowRecordBatch. + * Load a serialized ArrowRecordBatch. */ - private[arrow] def byteArrayToBatch( + private[arrow] def loadBatch( batchBytes: Array[Byte], allocator: BufferAllocator): ArrowRecordBatch = { -val in = new ByteArrayReadableSeekableByteChannel(batchBytes) -val reader = new ArrowFileReader(in, allocator) - -// Read a batch from a byte stream, ensure the reader is closed -Utils.tryWithSafeFinally { - val root = reader.getVectorSchemaRoot // throws IOException - val unloader = new VectorUnloader(root) - reader.loadNextBatch() // throws IOException - unloader.getRecordBatch -} { - reader.close() -} +val in = new ByteArrayInputStream(batchBytes) +MessageSerializer.deserializeMessageBatch(new ReadChannel(Channels.newChannel(in)), allocator) + .asInstanceOf[ArrowRecordBatch] // throws IOException } + /** + * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches. + */ private[sql] def toDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -val rdd = payloadRDD.rdd.mapPartitions { iter => +val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] +val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone +val rdd = arrowBatchRDD.rdd.mapPartitions { iter => val context = TaskContext.get() - ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context) + ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context) } -val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] sqlContext.internalCreateDataFrame(rdd, schema) } + + /** + * Read a file as an Arrow stream and return an RDD of serialized ArrowRecordBatches. + */ + private[sql] def readArrowStreamFromFile(sqlContext: SQLContext, filename: String): + JavaRDD[Array[Byte]] = { --- End diff -- indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/21536 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r194962850 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -34,8 +34,10 @@ case class ContinuousShuffleReadPartition( // Initialized only on the executor, and only once even as we call compute() multiple times. lazy val (reader: ContinuousShuffleReader, endpoint) = { val env = SparkEnv.get.rpcEnv -val receiver = new UnsafeRowReceiver(queueSize, numShuffleWriters, epochIntervalMs, env) -val endpoint = env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID()}", receiver) +val receiver = new RPCContinuousShuffleReader( + queueSize, numShuffleWriters, epochIntervalMs, env) +val endpoint = env.setupEndpoint(s"RPCContinuousShuffleReader-${UUID.randomUUID()}", receiver) --- End diff -- It requires a reasonable amount of extra code. As mentioned, this is not the final shuffle mechanism (and I intend to have the TCP-based shuffle ready to go in the next Spark release). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r194962429 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala --- @@ -68,7 +66,7 @@ private[shuffle] class UnsafeRowReceiver( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { -case r: UnsafeRowReceiverMessage => +case r: RPCContinuousShuffleMessage => queues(r.writerId).put(r) --- End diff -- That's a very strange characteristic for an RPC framework. I don't know what backpressure could mean other than a receiver blocking a sender from sending more data. In any case, the final shuffle mechanism isn't going to use the RPC framework, so I added a reference to it. (We can discuss in a later PR whether we want to leave this mechanism lying around or remove it once we're confident the TCP-based one is working.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r194962360 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala --- @@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils { } /** - * Python Callable function to convert ArrowPayloads into a [[DataFrame]]. + * Python callable function to convert an RDD of serialized ArrowRecordBatches into + * a [[DataFrame]]. * - * @param payloadRDD A JavaRDD of ArrowPayloads. - * @param schemaString JSON Formatted Schema for ArrowPayloads. + * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches. + * @param schemaString JSON Formatted Spark schema for Arrow batches. * @param sqlContext The active [[SQLContext]]. * @return The converted [[DataFrame]]. */ - def arrowPayloadToDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + def arrowStreamToDataFrame( --- End diff -- This seems being a private method now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21537 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/75/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21537 **[Test build #91744 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91744/testReport)** for PR 21537 at commit [`531faf4`](https://github.com/apache/spark/commit/531faf4aad4c942efb826fe7ca2ba8a7f1e5cf3f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21537 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21537 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21537 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3965/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21503#discussion_r194961443 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -17,15 +17,56 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.Strategy +import org.apache.spark.sql.{execution, Strategy} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { -case r: DataSourceV2Relation => - DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil +case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => + val projectSet = AttributeSet(project.flatMap(_.references)) + val filterSet = AttributeSet(filters.flatMap(_.references)) + + val projection = if (filterSet.subsetOf(projectSet) && + AttributeSet(relation.output) == projectSet) { +// When the required projection contains all of the filter columns and column pruning alone +// can produce the required projection, push the required projection. +// A final projection may still be needed if the data source produces a different column +// order or if it cannot prune all of the nested columns. +relation.output + } else { +// When there are filter columns not already in the required projection or when the required +// projection is more complicated than column pruning, base column pruning on the set of +// all columns needed by both. +(projectSet ++ filterSet).toSeq + } + + val reader = relation.newReader --- End diff -- @rdblue do you have time to prepare a PR for the 2rd proposal? I can do that too if you are busy with other stuff. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21537#discussion_r194961281 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -625,25 +625,23 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String val eval = child.genCode(ctx) val nullSafeCast = nullSafeCastFunction(child.dataType, dataType, ctx) -ev.copy(code = +ev.copy(code = eval.code + code""" --- End diff -- Oh. Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21537 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/74/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21537 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21537 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21537 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3964/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21537 **[Test build #91743 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91743/testReport)** for PR 21537 at commit [`7f486fe`](https://github.com/apache/spark/commit/7f486fe8f76b0b5a2c1784e211ff95633968db59). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21537#discussion_r194960933 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -579,6 +579,22 @@ class CodegenContext { s"${fullName}_$id" } + /** + * Creates an `ExprValue` representing a local java variable of required data type. + */ + def freshName(name: String, dt: DataType): VariableValue = JavaCode.variable(freshName(name), dt) --- End diff -- Sounds good. Not sure if @cloud-fan wants me to change it in this PR or other https://github.com/apache/spark/pull/21537#discussion_r194841055. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91739/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21547 **[Test build #91739 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91739/testReport)** for PR 21547 at commit [`0f37c94`](https://github.com/apache/spark/commit/0f37c94046e8bc6c0c96cb150b0c7d8ad99f72a2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21537#discussion_r194958976 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -579,6 +579,22 @@ class CodegenContext { s"${fullName}_$id" } + /** + * Creates an `ExprValue` representing a local java variable of required data type. + */ + def freshName(name: String, dt: DataType): VariableValue = JavaCode.variable(freshName(name), dt) + + /** + * Creates an `ExprValue` representing a local java variable of required data type. + */ + def freshName(name: String, javaClass: Class[_]): VariableValue = +JavaCode.variable(freshName(name), javaClass) + + /** + * Creates an `ExprValue` representing a local boolean java variable. + */ + def isNullFreshName(name: String): VariableValue = JavaCode.isNullVariable(freshName(name)) --- End diff -- hmm, don't we want to do this in this PR? I can change it to `freshName(name, BooleanType)` together with the changes for other comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r194958139 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.Partitioner +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. + * + * @param writerId The partition ID of this writer. + * @param outputPartitioner The partitioner on the reader side of the shuffle. + * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by + * partition ID within outputPartitioner. + */ +class RPCContinuousShuffleWriter( +writerId: Int, +outputPartitioner: Partitioner, +endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter { + + if (outputPartitioner.numPartitions != 1) { +throw new IllegalArgumentException("multiple readers not yet supported") + } + + if (outputPartitioner.numPartitions != endpoints.length) { +throw new IllegalArgumentException(s"partitioner size ${outputPartitioner.numPartitions} did " + + s"not match endpoint count ${endpoints.length}") + } + + def write(epoch: Iterator[UnsafeRow]): Unit = { +while (epoch.hasNext) { + val row = epoch.next() + endpoints(outputPartitioner.getPartition(row)).askSync[Unit](ReceiverRow(writerId, row)) +} + +endpoints.foreach(_.askSync[Unit](ReceiverEpochMarker(writerId))) --- End diff -- As far as I understand, the code here is to send a ReceiverEpochMarker to each endpoint and wait for all of them to response. You can send `ReceiverEpochMarker`s in parallel rather than send and wait one by one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r194957906 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala --- @@ -68,7 +66,7 @@ private[shuffle] class UnsafeRowReceiver( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { -case r: UnsafeRowReceiverMessage => +case r: RPCContinuousShuffleMessage => queues(r.writerId).put(r) --- End diff -- All RPC messages inside Spark are processed in [a shared fixed thread pool](https://github.com/apache/spark/blob/v2.3.1/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala#L197), hence we cannot run blocking calls inside a RPC thread. I think we need to design a backpressure mechanism in future fundamentally because a receiver cannot block a sender sending data. For example, even if we block here, we still cannot prevent the sender sending data and they will finally fulfill the TCP buffer. We cannot just count on TCP backpressure here as we need to use the same TCP connection in order to support thousands of machines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21441 **[Test build #91740 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91740/testReport)** for PR 21441 at commit [`a1b5c36`](https://github.com/apache/spark/commit/a1b5c369623def44288fa62c0e6668624b6b9be8). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91740/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21497 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r194954051 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3236,13 +3236,49 @@ class Dataset[T] private[sql]( } /** - * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark. + * Collect a Dataset as Arrow batches and serve stream to PySpark. */ private[sql] def collectAsArrowToPython(): Array[Any] = { +val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone + withAction("collectAsArrowToPython", queryExecution) { plan => - val iter: Iterator[Array[Byte]] = -toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable) - PythonRDD.serveIterator(iter, "serve-Arrow") + PythonRDD.serveToStream("serve-Arrow") { out => +val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) +val arrowBatchRdd = getArrowBatchRdd(plan) +val numPartitions = arrowBatchRdd.partitions.length + +// Store collection results for worst case of 1 to N-1 partitions +val results = new Array[Array[Array[Byte]]](numPartitions - 1) +var lastIndex = -1 // index of last partition written + +// Handler to eagerly write partitions to Python in order +def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { + // If result is from next partition in order + if (index - 1 == lastIndex) { +batchWriter.writeBatches(arrowBatches.iterator) +lastIndex += 1 +// Write stored partitions that come next in order +while (lastIndex < results.length && results(lastIndex) != null) { + batchWriter.writeBatches(results(lastIndex).iterator) + results(lastIndex) = null + lastIndex += 1 +} +// After last batch, end the stream +if (lastIndex == results.length) { + batchWriter.end() +} + } else { +// Store partitions received out of order +results(index - 1) = arrowBatches + } +} + +sparkSession.sparkContext.runJob( + arrowBatchRdd, + (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray, + 0 until numPartitions, + handlePartitionBatches) --- End diff -- +1 chunking if we could. I recall Bryan said for grouped UDF we need the entire set. Also not sure if python side we have any assumption on how much of the partition is in each chunk (there shouldn't be?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21506 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21388 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21503#discussion_r194953803 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -17,15 +17,56 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.Strategy +import org.apache.spark.sql.{execution, Strategy} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { -case r: DataSourceV2Relation => - DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil +case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => + val projectSet = AttributeSet(project.flatMap(_.references)) + val filterSet = AttributeSet(filters.flatMap(_.references)) + + val projection = if (filterSet.subsetOf(projectSet) && + AttributeSet(relation.output) == projectSet) { +// When the required projection contains all of the filter columns and column pruning alone +// can produce the required projection, push the required projection. +// A final projection may still be needed if the data source produces a different column +// order or if it cannot prune all of the nested columns. +relation.output + } else { +// When there are filter columns not already in the required projection or when the required +// projection is more complicated than column pruning, base column pruning on the set of +// all columns needed by both. +(projectSet ++ filterSet).toSeq + } + + val reader = relation.newReader --- End diff -- I'm not strongly opposed to any of the options, but 2 would be my choice if I had to pick one. A temporary state where functionality is missing is easier to reason about than temporary states where we deliberately impose a fuzzy lifecycle. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21506 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21497 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21497 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21537#discussion_r194953280 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -579,6 +579,22 @@ class CodegenContext { s"${fullName}_$id" } + /** + * Creates an `ExprValue` representing a local java variable of required data type. + */ + def freshName(name: String, dt: DataType): VariableValue = JavaCode.variable(freshName(name), dt) + + /** + * Creates an `ExprValue` representing a local java variable of required data type. + */ + def freshName(name: String, javaClass: Class[_]): VariableValue = +JavaCode.variable(freshName(name), javaClass) + + /** + * Creates an `ExprValue` representing a local boolean java variable. + */ + def isNullFreshName(name: String): VariableValue = JavaCode.isNullVariable(freshName(name)) --- End diff -- yea, but we should open another PR to do this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21533: [SPARK-24195][Core] Bug fix for local:/ path in S...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21533#discussion_r194953025 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -116,49 +116,52 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("basic case for addFile and listFiles") { val dir = Utils.createTempDir() +// file and absolute path for normal path val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) val absolutePath1 = file1.getAbsolutePath +// file and absolute path for relative path val file2 = File.createTempFile("someprefix2", "somesuffix2", dir) val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName val absolutePath2 = file2.getAbsolutePath +// file and absolute path for path with local scheme +val file3 = File.createTempFile("someprefix3", "somesuffix3", dir) +val localPath = "local://" + file3.getParent + "/../" + file3.getParentFile.getName + + "/" + file3.getName +val absolutePath3 = file3.getAbsolutePath + try { Files.write("somewords1", file1, StandardCharsets.UTF_8) Files.write("somewords2", file2, StandardCharsets.UTF_8) - val length1 = file1.length() - val length2 = file2.length() + Files.write("somewords3", file3, StandardCharsets.UTF_8) - sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) - sc.addFile(file1.getAbsolutePath) - sc.addFile(relativePath) - sc.parallelize(Array(1), 1).map(x => { -val gotten1 = new File(SparkFiles.get(file1.getName)) -val gotten2 = new File(SparkFiles.get(file2.getName)) -if (!gotten1.exists()) { + def checkGottenFile(file: File, absolutePath: String): Unit = { +val length = file.length() +val gotten = new File(SparkFiles.get(file.getName)) +if (!gotten.exists()) { throw new SparkException("file doesn't exist : " + absolutePath1) } -if (!gotten2.exists()) { - throw new SparkException("file doesn't exist : " + absolutePath2) -} -if (length1 != gotten1.length()) { +if (file.length() != gotten.length()) { throw new SparkException( -s"file has different length $length1 than added file ${gotten1.length()} : " + +s"file has different length $length than added file ${gotten.length()} : " + absolutePath1) } -if (length2 != gotten2.length()) { - throw new SparkException( -s"file has different length $length2 than added file ${gotten2.length()} : " + - absolutePath2) -} -if (absolutePath1 == gotten1.getAbsolutePath) { +if (absolutePath == gotten.getAbsolutePath) { throw new SparkException("file should have been copied :" + absolutePath1) } -if (absolutePath2 == gotten2.getAbsolutePath) { - throw new SparkException("file should have been copied : " + absolutePath2) -} --- End diff -- can we not change the existing test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21533: [SPARK-24195][Core] Bug fix for local:/ path in S...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21533#discussion_r194953034 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1517,9 +1517,12 @@ class SparkContext(config: SparkConf) extends Logging { * only supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { -val uri = new Path(path).toUri +var uri = new Path(path).toUri val schemeCorrectedPath = uri.getScheme match { - case null | "local" => new File(path).getCanonicalFile.toURI.toString + case null | "local" => +// SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here. +uri = new Path(uri.getPath).toUri --- End diff -- it changes `uri` - which is reference again below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21506 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21504 (Don't block on me - I won't have time to review in detail unless needed, but broadly the PR looks fine) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21370: [SPARK-24215][PySpark] Implement eager evaluation...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21370#discussion_r194952924 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3209,6 +3222,19 @@ class Dataset[T] private[sql]( } } + private[sql] def getRowsToPython( + _numRows: Int, + truncate: Int, + vertical: Boolean): Array[Any] = { +EvaluatePython.registerPicklers() +val numRows = _numRows.max(0).min(Int.MaxValue - 1) +val rows = getRows(numRows, truncate, vertical).map(_.toArray).toArray +val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType))) +val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler( + rows.iterator.map(toJava)) +PythonRDD.serveIterator(iter, "serve-GetRows") --- End diff -- I think we better don't talk about this here though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21428 **[Test build #91742 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91742/testReport)** for PR 21428 at commit [`59d6ff7`](https://github.com/apache/spark/commit/59d6ff79de54dc8d99390a056a0133f35667e817). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r194952174 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala --- @@ -68,7 +66,7 @@ private[shuffle] class UnsafeRowReceiver( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { -case r: UnsafeRowReceiverMessage => +case r: RPCContinuousShuffleMessage => queues(r.writerId).put(r) --- End diff -- I'm not sure what a critical RPC message is in this context. This line is intended to block forever if the queue is full; the receiver should not take any action or accept any other messages until the queue stops being full. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21504 **[Test build #91741 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91741/testReport)** for PR 21504 at commit [`bbec6ce`](https://github.com/apache/spark/commit/bbec6ce4c1ffb1f1ba95d62cebb363fd38c6f698). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21370: [SPARK-24215][PySpark] Implement eager evaluation...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21370#discussion_r194951791 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3209,6 +3222,19 @@ class Dataset[T] private[sql]( } } + private[sql] def getRowsToPython( + _numRows: Int, + truncate: Int, + vertical: Boolean): Array[Any] = { +EvaluatePython.registerPicklers() +val numRows = _numRows.max(0).min(Int.MaxValue - 1) +val rows = getRows(numRows, truncate, vertical).map(_.toArray).toArray +val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType))) +val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler( + rows.iterator.map(toJava)) +PythonRDD.serveIterator(iter, "serve-GetRows") --- End diff -- re the py4j commit - there's a good reason for it @gatorsmile not sure if the change to return type is required with the py4j change though --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21370: [SPARK-24215][PySpark] Implement eager evaluation...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21370#discussion_r194951918 --- Diff: docs/configuration.md --- @@ -456,6 +456,33 @@ Apart from these, the following properties are also available, and may be useful from JVM to Python worker for every task. + + spark.sql.repl.eagerEval.enabled + false + +Enable eager evaluation or not. If true and the REPL you are using supports eager evaluation, --- End diff -- I don't completely follow actually - what makes a "REPL does not support eager evaluation"? in fact, this "eager evaluation" is just object rendering support build into REPL, notebook etc (like print), it's not really design to be "eager evaluation" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3963/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3962/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21441 **[Test build #91740 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91740/testReport)** for PR 21441 at commit [`a1b5c36`](https://github.com/apache/spark/commit/a1b5c369623def44288fa62c0e6668624b6b9be8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/73/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21441 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21510 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to se...
GitHub user HyukjinKwon reopened a pull request: https://github.com/apache/spark/pull/21441 [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the test failures ## What changes were proposed in this pull request? Want to see the test results against hadoop-3.1 profile. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark hadoop3-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21441.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21441 commit a1b5c369623def44288fa62c0e6668624b6b9be8 Author: hyukjinkwon Date: 2018-05-28T07:04:10Z Run tests against hadoop-3.1 to see the test failures --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21510 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91736/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21510 **[Test build #91736 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91736/testReport)** for PR 21510 at commit [`3ab14a1`](https://github.com/apache/spark/commit/3ab14a1eb2f8c9d5a50a031425fccb5c17bdd4b6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21547: [SPARK-24538][SQL] ByteArrayDecimalType support p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21547#discussion_r194950951 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -359,6 +359,41 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("filter pushdown - decimal(ByteArrayDecimalType)") { +val one = new java.math.BigDecimal(1) +val two = new java.math.BigDecimal(2) +val three = new java.math.BigDecimal(3) +val four = new java.math.BigDecimal(4) --- End diff -- Hm, why don't we follow the style in this file with `implicit class`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r194950709 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala --- @@ -79,10 +77,10 @@ private[shuffle] class UnsafeRowReceiver( private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) private val executor = Executors.newFixedThreadPool(numShuffleWriters) - private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + private val completion = new ExecutorCompletionService[RPCContinuousShuffleMessage](executor) --- End diff -- It cannot be. There's a deadlock scenario where the queue is filled with records from epoch N before all writers have sent the marker for epoch N - 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r194950212 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.Partitioner +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. + * + * @param writerId The partition ID of this writer. + * @param outputPartitioner The partitioner on the reader side of the shuffle. + * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by + * partition ID within outputPartitioner. + */ +class RPCContinuousShuffleWriter( +writerId: Int, +outputPartitioner: Partitioner, +endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter { + + if (outputPartitioner.numPartitions != 1) { +throw new IllegalArgumentException("multiple readers not yet supported") + } + + if (outputPartitioner.numPartitions != endpoints.length) { +throw new IllegalArgumentException(s"partitioner size ${outputPartitioner.numPartitions} did " + + s"not match endpoint count ${endpoints.length}") + } + + def write(epoch: Iterator[UnsafeRow]): Unit = { +while (epoch.hasNext) { + val row = epoch.next() + endpoints(outputPartitioner.getPartition(row)).askSync[Unit](ReceiverRow(writerId, row)) +} + +endpoints.foreach(_.askSync[Unit](ReceiverEpochMarker(writerId))) --- End diff -- Sure, but I don't think there's any benefit to doing so. We need to sequence the messages across epochs too, so there's little parallelization available that way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194950151 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.language.reflectiveCalls + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.streaming.StreamingQueryListener._ + + +class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { + + import testImplicits._ + + + override protected def sparkConf: SparkConf = +super.sparkConf.set("spark.sql.streamingQueryListeners", --- End diff -- seems a mistake here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21529 I think for this PR, apart from the end-to-end test for checking result, we should also have a unit test in `PlannerSuite` to cover this case. In the followup, we can add the config support for the golden file tests and add more test cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21504 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91737/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21504 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21504 **[Test build #91737 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91737/testReport)** for PR 21504 at commit [`b87c90b`](https://github.com/apache/spark/commit/b87c90b6bb356e0faaf8230cb6b20fbcdd65c858). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r194948874 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3236,13 +3236,49 @@ class Dataset[T] private[sql]( } /** - * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark. + * Collect a Dataset as Arrow batches and serve stream to PySpark. */ private[sql] def collectAsArrowToPython(): Array[Any] = { +val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone + withAction("collectAsArrowToPython", queryExecution) { plan => - val iter: Iterator[Array[Byte]] = -toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable) - PythonRDD.serveIterator(iter, "serve-Arrow") + PythonRDD.serveToStream("serve-Arrow") { out => +val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) +val arrowBatchRdd = getArrowBatchRdd(plan) +val numPartitions = arrowBatchRdd.partitions.length + +// Store collection results for worst case of 1 to N-1 partitions +val results = new Array[Array[Array[Byte]]](numPartitions - 1) +var lastIndex = -1 // index of last partition written + +// Handler to eagerly write partitions to Python in order +def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { + // If result is from next partition in order + if (index - 1 == lastIndex) { +batchWriter.writeBatches(arrowBatches.iterator) +lastIndex += 1 +// Write stored partitions that come next in order +while (lastIndex < results.length && results(lastIndex) != null) { + batchWriter.writeBatches(results(lastIndex).iterator) + results(lastIndex) = null + lastIndex += 1 +} +// After last batch, end the stream +if (lastIndex == results.length) { + batchWriter.end() +} + } else { +// Store partitions received out of order +results(index - 1) = arrowBatches + } +} + +sparkSession.sparkContext.runJob( + arrowBatchRdd, + (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray, + 0 until numPartitions, + handlePartitionBatches) --- End diff -- Instead of collecting partitions back at once and holding out of order partitions in driver waiting for partitions in order, is it better to incrementally run job on partitions in order and send streams to python side? So we don't need to hold out of order partitions in driver. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r194949076 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala --- @@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils { } /** - * Python Callable function to convert ArrowPayloads into a [[DataFrame]]. + * Python callable function to convert an RDD of serialized ArrowRecordBatches into + * a [[DataFrame]]. * - * @param payloadRDD A JavaRDD of ArrowPayloads. - * @param schemaString JSON Formatted Schema for ArrowPayloads. + * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches. + * @param schemaString JSON Formatted Spark schema for Arrow batches. * @param sqlContext The active [[SQLContext]]. * @return The converted [[DataFrame]]. */ - def arrowPayloadToDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + def arrowStreamToDataFrame( + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -ArrowConverters.toDataFrame(payloadRDD, schemaString, sqlContext) +ArrowConverters.toDataFrame(arrowBatchRDD, schemaString, sqlContext) + } + + /** + * Python callable function to read a file in Arrow stream format and create a [[DataFrame]] + * using each serialized ArrowRecordBatch as a partition. + * + * @param sqlContext The active [[SQLContext]]. + * @param filename File to read the Arrow stream from. + * @param schemaString JSON Formatted Spark schema for Arrow batches. + * @return A new [[DataFrame]]. + */ + def arrowReadStreamFromFile( + sqlContext: SQLContext, + filename: String, + schemaString: String): DataFrame = { +JavaSparkContext.fromSparkContext(sqlContext.sparkContext) --- End diff -- What is this line for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21537#discussion_r194946640 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -1004,26 +1014,30 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String private[this] def castToIntervalCode(from: DataType): CastFunction = from match { case StringType => (c, evPrim, evNull) => -s"""$evPrim = CalendarInterval.fromString($c.toString()); +code"""$evPrim = CalendarInterval.fromString($c.toString()); if(${evPrim} == null) { ${evNull} = true; } """.stripMargin } - private[this] def decimalToTimestampCode(d: String): String = -s"($d.toBigDecimal().bigDecimal().multiply(new java.math.BigDecimal(100L))).longValue()" - private[this] def longToTimeStampCode(l: String): String = s"$l * 100L" - private[this] def timestampToIntegerCode(ts: String): String = -s"java.lang.Math.floor((double) $ts / 100L)" - private[this] def timestampToDoubleCode(ts: String): String = s"$ts / 100.0" + private[this] def decimalToTimestampCode(d: ExprValue): ExprValue = +JavaCode.expression( + s"($d.toBigDecimal().bigDecimal().multiply(new java.math.BigDecimal(100L))).longValue()", --- End diff -- hmm, here we should use `Block`. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21535: [SPARK-23596][SQL][WIP] Test interpreted path on Dataset...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21535 @hvanhovell I observed the testing time, roughly for a same test, additional interpreted path will add 50~120% time. Interpreted encoders? Do you mean to only test interpreted path for `org.apache.spark.sql.catalyst.encoders.*`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21515: [SPARK-24372][build] Add scripts to help with preparing ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21515 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21515: [SPARK-24372][build] Add scripts to help with preparing ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21515 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91734/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21515: [SPARK-24372][build] Add scripts to help with preparing ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21515 **[Test build #91734 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91734/testReport)** for PR 21515 at commit [`04f6371`](https://github.com/apache/spark/commit/04f6371a3aa0f03ba1c37b5b450bea69923388e9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21539: [SPARK-24500][SQL] Make sure streams are materialized du...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21539 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91735/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21539: [SPARK-24500][SQL] Make sure streams are materialized du...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21539 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3961/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/72/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21539: [SPARK-24500][SQL] Make sure streams are materialized du...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21539 **[Test build #91735 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91735/testReport)** for PR 21539 at commit [`d5832f4`](https://github.com/apache/spark/commit/d5832f4b50d4c8f84feb462291d8da37e87b192f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21547 **[Test build #91739 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91739/testReport)** for PR 21547 at commit [`0f37c94`](https://github.com/apache/spark/commit/0f37c94046e8bc6c0c96cb150b0c7d8ad99f72a2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/71/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3960/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21547 **[Test build #91738 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91738/testReport)** for PR 21547 at commit [`9606670`](https://github.com/apache/spark/commit/96066701ec75d3caa27994c47eab8ff64150b6a5). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21547 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91738/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21547 **[Test build #91738 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91738/testReport)** for PR 21547 at commit [`9606670`](https://github.com/apache/spark/commit/96066701ec75d3caa27994c47eab8ff64150b6a5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21547: [SPARK-24538][SQL] ByteArrayDecimalType support p...
GitHub user wangyum opened a pull request: https://github.com/apache/spark/pull/21547 [SPARK-24538][SQL] ByteArrayDecimalType support push down to the data sources ## What changes were proposed in this pull request? [ByteArrayDecimalType](https://github.com/apache/spark/blob/e28eb431146bcdcaf02a6f6c406ca30920592a6a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala#L230) support push down to the data sources. ## How was this patch tested? unit tests and manual tests. **manual tests**: ```scala spark.range(1000).selectExpr("id", "cast(id as decimal(9)) as d1", "cast(id as decimal(9, 2)) as d2", "cast(id as decimal(18)) as d3", "cast(id as decimal(18, 4)) as d4", "cast(id as decimal(38)) as d5", "cast(id as decimal(38, 18)) as d6").coalesce(1).write.option("parquet.block.size", 1048576).parquet("/tmp/spark/parquet/decimal") val df = spark.read.parquet("/tmp/spark/parquet/decimal/") // Only read about 1 MB data df.filter("d6 = 1").show // Read 174.3 MB data df.filter("d3 = 1").show ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangyum/spark SPARK-24538 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21547 commit 96066701ec75d3caa27994c47eab8ff64150b6a5 Author: Yuming Wang Date: 2018-06-13T01:35:55Z ByteArrayDecimalType support push down to the data sources --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91733/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21546 **[Test build #91733 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91733/testReport)** for PR 21546 at commit [`a5a1fbe`](https://github.com/apache/spark/commit/a5a1fbe7121c5b0dd93876a56c29ad17dcd9b168). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21504 **[Test build #91737 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91737/testReport)** for PR 21504 at commit [`b87c90b`](https://github.com/apache/spark/commit/b87c90b6bb356e0faaf8230cb6b20fbcdd65c858). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org