[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21536
  
**[Test build #91746 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91746/testReport)**
 for PR 21536 at commit 
[`2ea2181`](https://github.com/apache/spark/commit/2ea2181697038dbd2109f2daeb347d98724b93af).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21536
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21536
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3966/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21428
  
**[Test build #91745 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91745/testReport)**
 for PR 21428 at commit 
[`4bbdeae`](https://github.com/apache/spark/commit/4bbdeae3b9d7d1955593dab360422dd59800d54f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21536
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/76/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21536
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r194963031
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -183,34 +182,131 @@ private[sql] object ArrowConverters {
   }
 
   /**
-   * Convert a byte array to an ArrowRecordBatch.
+   * Load a serialized ArrowRecordBatch.
*/
-  private[arrow] def byteArrayToBatch(
+  private[arrow] def loadBatch(
   batchBytes: Array[Byte],
   allocator: BufferAllocator): ArrowRecordBatch = {
-val in = new ByteArrayReadableSeekableByteChannel(batchBytes)
-val reader = new ArrowFileReader(in, allocator)
-
-// Read a batch from a byte stream, ensure the reader is closed
-Utils.tryWithSafeFinally {
-  val root = reader.getVectorSchemaRoot  // throws IOException
-  val unloader = new VectorUnloader(root)
-  reader.loadNextBatch()  // throws IOException
-  unloader.getRecordBatch
-} {
-  reader.close()
-}
+val in = new ByteArrayInputStream(batchBytes)
+MessageSerializer.deserializeMessageBatch(new 
ReadChannel(Channels.newChannel(in)), allocator)
+  .asInstanceOf[ArrowRecordBatch]  // throws IOException
   }
 
+  /**
+   * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches.
+   */
   private[sql] def toDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  arrowBatchRDD: JavaRDD[Array[Byte]],
   schemaString: String,
   sqlContext: SQLContext): DataFrame = {
-val rdd = payloadRDD.rdd.mapPartitions { iter =>
+val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
+val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone
+val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
   val context = TaskContext.get()
-  ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
+  ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
 }
-val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
 sqlContext.internalCreateDataFrame(rdd, schema)
   }
+
+  /**
+   * Read a file as an Arrow stream and return an RDD of serialized 
ArrowRecordBatches.
+   */
+  private[sql] def readArrowStreamFromFile(sqlContext: SQLContext, 
filename: String):
+  JavaRDD[Array[Byte]] = {
--- End diff --

indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21536: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInM...

2018-06-12 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/21536
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-06-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r194962850
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -34,8 +34,10 @@ case class ContinuousShuffleReadPartition(
   // Initialized only on the executor, and only once even as we call 
compute() multiple times.
   lazy val (reader: ContinuousShuffleReader, endpoint) = {
 val env = SparkEnv.get.rpcEnv
-val receiver = new UnsafeRowReceiver(queueSize, numShuffleWriters, 
epochIntervalMs, env)
-val endpoint = 
env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID()}", receiver)
+val receiver = new RPCContinuousShuffleReader(
+  queueSize, numShuffleWriters, epochIntervalMs, env)
+val endpoint = 
env.setupEndpoint(s"RPCContinuousShuffleReader-${UUID.randomUUID()}", receiver)
--- End diff --

It requires a reasonable amount of extra code. As mentioned, this is not 
the final shuffle mechanism (and I intend to have the TCP-based shuffle ready 
to go in the next Spark release).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-06-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r194962429
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala
 ---
@@ -68,7 +66,7 @@ private[shuffle] class UnsafeRowReceiver(
   }
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
-case r: UnsafeRowReceiverMessage =>
+case r: RPCContinuousShuffleMessage =>
   queues(r.writerId).put(r)
--- End diff --

That's a very strange characteristic for an RPC framework.

I don't know what backpressure could mean other than a receiver blocking a 
sender from sending more data. In any case, the final shuffle mechanism isn't 
going to use the RPC framework, so I added a reference to it. (We can discuss 
in a later PR whether we want to leave this mechanism lying around or remove it 
once we're confident the TCP-based one is working.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r194962360
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala ---
@@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils {
   }
 
   /**
-   * Python Callable function to convert ArrowPayloads into a 
[[DataFrame]].
+   * Python callable function to convert an RDD of serialized 
ArrowRecordBatches into
+   * a [[DataFrame]].
*
-   * @param payloadRDD A JavaRDD of ArrowPayloads.
-   * @param schemaString JSON Formatted Schema for ArrowPayloads.
+   * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches.
+   * @param schemaString JSON Formatted Spark schema for Arrow batches.
* @param sqlContext The active [[SQLContext]].
* @return The converted [[DataFrame]].
*/
-  def arrowPayloadToDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  def arrowStreamToDataFrame(
--- End diff --

This seems being a private method now?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21537
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/75/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21537
  
**[Test build #91744 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91744/testReport)**
 for PR 21537 at commit 
[`531faf4`](https://github.com/apache/spark/commit/531faf4aad4c942efb826fe7ca2ba8a7f1e5cf3f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21537
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21537
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21537
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3965/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194961443
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

@rdblue do you have time to prepare a PR for the 2rd proposal? I can do 
that too if you are busy with other stuff.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...

2018-06-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21537#discussion_r194961281
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 ---
@@ -625,25 +625,23 @@ case class Cast(child: Expression, dataType: 
DataType, timeZoneId: Option[String
 val eval = child.genCode(ctx)
 val nullSafeCast = nullSafeCastFunction(child.dataType, dataType, ctx)
 
-ev.copy(code =
+ev.copy(code = eval.code +
   code"""
--- End diff --

Oh. Yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21537
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/74/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21537
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21537
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21537
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3964/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21537
  
**[Test build #91743 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91743/testReport)**
 for PR 21537 at commit 
[`7f486fe`](https://github.com/apache/spark/commit/7f486fe8f76b0b5a2c1784e211ff95633968db59).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...

2018-06-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21537#discussion_r194960933
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -579,6 +579,22 @@ class CodegenContext {
 s"${fullName}_$id"
   }
 
+  /**
+   * Creates an `ExprValue` representing a local java variable of required 
data type.
+   */
+  def freshName(name: String, dt: DataType): VariableValue = 
JavaCode.variable(freshName(name), dt)
--- End diff --

Sounds good. Not sure if @cloud-fan wants me to change it in this PR or 
other https://github.com/apache/spark/pull/21537#discussion_r194841055. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91739/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21547
  
**[Test build #91739 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91739/testReport)**
 for PR 21547 at commit 
[`0f37c94`](https://github.com/apache/spark/commit/0f37c94046e8bc6c0c96cb150b0c7d8ad99f72a2).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...

2018-06-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21537#discussion_r194958976
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -579,6 +579,22 @@ class CodegenContext {
 s"${fullName}_$id"
   }
 
+  /**
+   * Creates an `ExprValue` representing a local java variable of required 
data type.
+   */
+  def freshName(name: String, dt: DataType): VariableValue = 
JavaCode.variable(freshName(name), dt)
+
+  /**
+   * Creates an `ExprValue` representing a local java variable of required 
data type.
+   */
+  def freshName(name: String, javaClass: Class[_]): VariableValue =
+JavaCode.variable(freshName(name), javaClass)
+
+  /**
+   * Creates an `ExprValue` representing a local boolean java variable.
+   */
+  def isNullFreshName(name: String): VariableValue = 
JavaCode.isNullVariable(freshName(name))
--- End diff --

hmm, don't we want to do this in this PR? I can change it to 
`freshName(name, BooleanType)` together with the changes for other comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-06-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r194958139
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * A [[ContinuousShuffleWriter]] sending data to 
[[RPCContinuousShuffleReader]] instances.
+ *
+ * @param writerId  The partition ID of this writer.
+ * @param outputPartitioner The partitioner on the reader side of the 
shuffle.
+ * @param endpoints The [[RPCContinuousShuffleReader]] endpoints 
to write to. Indexed by
+ *  partition ID within outputPartitioner.
+ */
+class RPCContinuousShuffleWriter(
+writerId: Int,
+outputPartitioner: Partitioner,
+endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter {
+
+  if (outputPartitioner.numPartitions != 1) {
+throw new IllegalArgumentException("multiple readers not yet 
supported")
+  }
+
+  if (outputPartitioner.numPartitions != endpoints.length) {
+throw new IllegalArgumentException(s"partitioner size 
${outputPartitioner.numPartitions} did " +
+  s"not match endpoint count ${endpoints.length}")
+  }
+
+  def write(epoch: Iterator[UnsafeRow]): Unit = {
+while (epoch.hasNext) {
+  val row = epoch.next()
+  
endpoints(outputPartitioner.getPartition(row)).askSync[Unit](ReceiverRow(writerId,
 row))
+}
+
+endpoints.foreach(_.askSync[Unit](ReceiverEpochMarker(writerId)))
--- End diff --

As far as I understand, the code here is to send a ReceiverEpochMarker to 
each endpoint and wait for all of them to response. You can send 
`ReceiverEpochMarker`s in parallel rather than send and wait one by one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-06-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r194957906
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala
 ---
@@ -68,7 +66,7 @@ private[shuffle] class UnsafeRowReceiver(
   }
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
-case r: UnsafeRowReceiverMessage =>
+case r: RPCContinuousShuffleMessage =>
   queues(r.writerId).put(r)
--- End diff --

All RPC messages inside Spark are processed in [a shared fixed thread 
pool](https://github.com/apache/spark/blob/v2.3.1/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala#L197),
 hence we cannot run blocking calls inside a RPC thread.

I think we need to design a backpressure mechanism in future fundamentally 
because a receiver cannot block a sender sending data. For example, even if we 
block here, we still cannot prevent the sender sending data and they will 
finally fulfill the TCP buffer. We cannot just count on TCP backpressure here 
as we need to use the same TCP connection in order to support thousands of 
machines.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21441
  
**[Test build #91740 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91740/testReport)**
 for PR 21441 at commit 
[`a1b5c36`](https://github.com/apache/spark/commit/a1b5c369623def44288fa62c0e6668624b6b9be8).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91740/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21497


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-12 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r194954051
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3236,13 +3236,49 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark.
+   * Collect a Dataset as Arrow batches and serve stream to PySpark.
*/
   private[sql] def collectAsArrowToPython(): Array[Any] = {
+val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
+
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  val iter: Iterator[Array[Byte]] =
-toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable)
-  PythonRDD.serveIterator(iter, "serve-Arrow")
+  PythonRDD.serveToStream("serve-Arrow") { out =>
+val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
+val arrowBatchRdd = getArrowBatchRdd(plan)
+val numPartitions = arrowBatchRdd.partitions.length
+
+// Store collection results for worst case of 1 to N-1 partitions
+val results = new Array[Array[Array[Byte]]](numPartitions - 1)
+var lastIndex = -1  // index of last partition written
+
+// Handler to eagerly write partitions to Python in order
+def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
+  // If result is from next partition in order
+  if (index - 1 == lastIndex) {
+batchWriter.writeBatches(arrowBatches.iterator)
+lastIndex += 1
+// Write stored partitions that come next in order
+while (lastIndex < results.length && results(lastIndex) != 
null) {
+  batchWriter.writeBatches(results(lastIndex).iterator)
+  results(lastIndex) = null
+  lastIndex += 1
+}
+// After last batch, end the stream
+if (lastIndex == results.length) {
+  batchWriter.end()
+}
+  } else {
+// Store partitions received out of order
+results(index - 1) = arrowBatches
+  }
+}
+
+sparkSession.sparkContext.runJob(
+  arrowBatchRdd,
+  (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray,
+  0 until numPartitions,
+  handlePartitionBatches)
--- End diff --

+1 chunking if we could. I recall Bryan said for grouped UDF we need the 
entire set.

Also not sure if python side we have any assumption on how much of the 
partition is in each chunk (there shouldn't be?)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21506


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21388
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194953803
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

I'm not strongly opposed to any of the options, but 2 would be my choice if 
I had to pick one. A temporary state where functionality is missing is easier 
to reason about than temporary states where we deliberately impose a fuzzy 
lifecycle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21506
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21497
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-12 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21497
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21537#discussion_r194953280
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -579,6 +579,22 @@ class CodegenContext {
 s"${fullName}_$id"
   }
 
+  /**
+   * Creates an `ExprValue` representing a local java variable of required 
data type.
+   */
+  def freshName(name: String, dt: DataType): VariableValue = 
JavaCode.variable(freshName(name), dt)
+
+  /**
+   * Creates an `ExprValue` representing a local java variable of required 
data type.
+   */
+  def freshName(name: String, javaClass: Class[_]): VariableValue =
+JavaCode.variable(freshName(name), javaClass)
+
+  /**
+   * Creates an `ExprValue` representing a local boolean java variable.
+   */
+  def isNullFreshName(name: String): VariableValue = 
JavaCode.isNullVariable(freshName(name))
--- End diff --

yea, but we should open another PR to do this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21533: [SPARK-24195][Core] Bug fix for local:/ path in S...

2018-06-12 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21533#discussion_r194953025
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -116,49 +116,52 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   test("basic case for addFile and listFiles") {
 val dir = Utils.createTempDir()
 
+// file and absolute path for normal path
 val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
 val absolutePath1 = file1.getAbsolutePath
 
+// file and absolute path for relative path
 val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
 val relativePath = file2.getParent + "/../" + 
file2.getParentFile.getName + "/" + file2.getName
 val absolutePath2 = file2.getAbsolutePath
 
+// file and absolute path for path with local scheme
+val file3 = File.createTempFile("someprefix3", "somesuffix3", dir)
+val localPath = "local://" + file3.getParent + "/../" + 
file3.getParentFile.getName +
+  "/" + file3.getName
+val absolutePath3 = file3.getAbsolutePath
+
 try {
   Files.write("somewords1", file1, StandardCharsets.UTF_8)
   Files.write("somewords2", file2, StandardCharsets.UTF_8)
-  val length1 = file1.length()
-  val length2 = file2.length()
+  Files.write("somewords3", file3, StandardCharsets.UTF_8)
 
-  sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
-  sc.addFile(file1.getAbsolutePath)
-  sc.addFile(relativePath)
-  sc.parallelize(Array(1), 1).map(x => {
-val gotten1 = new File(SparkFiles.get(file1.getName))
-val gotten2 = new File(SparkFiles.get(file2.getName))
-if (!gotten1.exists()) {
+  def checkGottenFile(file: File, absolutePath: String): Unit = {
+val length = file.length()
+val gotten = new File(SparkFiles.get(file.getName))
+if (!gotten.exists()) {
   throw new SparkException("file doesn't exist : " + absolutePath1)
 }
-if (!gotten2.exists()) {
-  throw new SparkException("file doesn't exist : " + absolutePath2)
-}
 
-if (length1 != gotten1.length()) {
+if (file.length() != gotten.length()) {
   throw new SparkException(
-s"file has different length $length1 than added file 
${gotten1.length()} : " +
+s"file has different length $length than added file 
${gotten.length()} : " +
   absolutePath1)
 }
-if (length2 != gotten2.length()) {
-  throw new SparkException(
-s"file has different length $length2 than added file 
${gotten2.length()} : " +
-  absolutePath2)
-}
 
-if (absolutePath1 == gotten1.getAbsolutePath) {
+if (absolutePath == gotten.getAbsolutePath) {
   throw new SparkException("file should have been copied :" + 
absolutePath1)
 }
-if (absolutePath2 == gotten2.getAbsolutePath) {
-  throw new SparkException("file should have been copied : " + 
absolutePath2)
-}
--- End diff --

can we not change the existing test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21533: [SPARK-24195][Core] Bug fix for local:/ path in S...

2018-06-12 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21533#discussion_r194953034
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1517,9 +1517,12 @@ class SparkContext(config: SparkConf) extends 
Logging {
* only supported for Hadoop-supported filesystems.
*/
   def addFile(path: String, recursive: Boolean): Unit = {
-val uri = new Path(path).toUri
+var uri = new Path(path).toUri
 val schemeCorrectedPath = uri.getScheme match {
-  case null | "local" => new File(path).getCanonicalFile.toURI.toString
+  case null | "local" =>
+// SPARK-24195: Local is not a valid scheme for FileSystem, we 
should only keep path here.
+uri = new Path(uri.getPath).toUri
--- End diff --

it changes `uri` - which is reference again below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-12 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21506
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-12 Thread jose-torres
Github user jose-torres commented on the issue:

https://github.com/apache/spark/pull/21504
  
(Don't block on me - I won't have time to review in detail unless needed, 
but broadly the PR looks fine)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21370: [SPARK-24215][PySpark] Implement eager evaluation...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21370#discussion_r194952924
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3209,6 +3222,19 @@ class Dataset[T] private[sql](
 }
   }
 
+  private[sql] def getRowsToPython(
+  _numRows: Int,
+  truncate: Int,
+  vertical: Boolean): Array[Any] = {
+EvaluatePython.registerPicklers()
+val numRows = _numRows.max(0).min(Int.MaxValue - 1)
+val rows = getRows(numRows, truncate, vertical).map(_.toArray).toArray
+val toJava: (Any) => Any = EvaluatePython.toJava(_, 
ArrayType(ArrayType(StringType)))
+val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(
+  rows.iterator.map(toJava))
+PythonRDD.serveIterator(iter, "serve-GetRows")
--- End diff --

I think we better don't talk about this here though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21428
  
**[Test build #91742 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91742/testReport)**
 for PR 21428 at commit 
[`59d6ff7`](https://github.com/apache/spark/commit/59d6ff79de54dc8d99390a056a0133f35667e817).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-06-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r194952174
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala
 ---
@@ -68,7 +66,7 @@ private[shuffle] class UnsafeRowReceiver(
   }
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
-case r: UnsafeRowReceiverMessage =>
+case r: RPCContinuousShuffleMessage =>
   queues(r.writerId).put(r)
--- End diff --

I'm not sure what a critical RPC message is in this context. This line is 
intended to block forever if the queue is full; the receiver should not take 
any action or accept any other messages until the queue stops being full.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21504
  
**[Test build #91741 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91741/testReport)**
 for PR 21504 at commit 
[`bbec6ce`](https://github.com/apache/spark/commit/bbec6ce4c1ffb1f1ba95d62cebb363fd38c6f698).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21370: [SPARK-24215][PySpark] Implement eager evaluation...

2018-06-12 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21370#discussion_r194951791
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3209,6 +3222,19 @@ class Dataset[T] private[sql](
 }
   }
 
+  private[sql] def getRowsToPython(
+  _numRows: Int,
+  truncate: Int,
+  vertical: Boolean): Array[Any] = {
+EvaluatePython.registerPicklers()
+val numRows = _numRows.max(0).min(Int.MaxValue - 1)
+val rows = getRows(numRows, truncate, vertical).map(_.toArray).toArray
+val toJava: (Any) => Any = EvaluatePython.toJava(_, 
ArrayType(ArrayType(StringType)))
+val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(
+  rows.iterator.map(toJava))
+PythonRDD.serveIterator(iter, "serve-GetRows")
--- End diff --

re the py4j commit - there's a good reason for it @gatorsmile 
not sure if the change to return type is required with the py4j change 
though


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21370: [SPARK-24215][PySpark] Implement eager evaluation...

2018-06-12 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21370#discussion_r194951918
  
--- Diff: docs/configuration.md ---
@@ -456,6 +456,33 @@ Apart from these, the following properties are also 
available, and may be useful
 from JVM to Python worker for every task.
   
 
+
+  spark.sql.repl.eagerEval.enabled
+  false
+  
+Enable eager evaluation or not. If true and the REPL you are using 
supports eager evaluation,
--- End diff --

I don't completely follow actually - what makes a "REPL does not support 
eager evaluation"?
in fact, this "eager evaluation" is just object rendering support build 
into REPL, notebook etc (like print), it's not really design to be "eager 
evaluation"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3963/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3962/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21441
  
**[Test build #91740 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91740/testReport)**
 for PR 21441 at commit 
[`a1b5c36`](https://github.com/apache/spark/commit/a1b5c369623def44288fa62c0e6668624b6b9be8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/73/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21441
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21510
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to se...

2018-06-12 Thread HyukjinKwon
GitHub user HyukjinKwon reopened a pull request:

https://github.com/apache/spark/pull/21441

[DO-NOT-MERGE] Run tests against hadoop-3.1 to see the test failures

## What changes were proposed in this pull request?

Want to see the test results against hadoop-3.1 profile.

## How was this patch tested?

N/A


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark hadoop3-tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21441.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21441


commit a1b5c369623def44288fa62c0e6668624b6b9be8
Author: hyukjinkwon 
Date:   2018-05-28T07:04:10Z

Run tests against hadoop-3.1 to see the test failures




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21510
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91736/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21510
  
**[Test build #91736 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91736/testReport)**
 for PR 21510 at commit 
[`3ab14a1`](https://github.com/apache/spark/commit/3ab14a1eb2f8c9d5a50a031425fccb5c17bdd4b6).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21547: [SPARK-24538][SQL] ByteArrayDecimalType support p...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21547#discussion_r194950951
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -359,6 +359,41 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
 }
   }
 
+  test("filter pushdown - decimal(ByteArrayDecimalType)") {
+val one = new java.math.BigDecimal(1)
+val two = new java.math.BigDecimal(2)
+val three = new java.math.BigDecimal(3)
+val four = new java.math.BigDecimal(4)
--- End diff --

Hm, why don't we follow the style in this file with `implicit class`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-06-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r194950709
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala
 ---
@@ -79,10 +77,10 @@ private[shuffle] class UnsafeRowReceiver(
   private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
 
   private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
-  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+  private val completion = new 
ExecutorCompletionService[RPCContinuousShuffleMessage](executor)
--- End diff --

It cannot be. There's a deadlock scenario where the queue is filled with 
records from epoch N before all writers have sent the marker for epoch N - 1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-06-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r194950212
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * A [[ContinuousShuffleWriter]] sending data to 
[[RPCContinuousShuffleReader]] instances.
+ *
+ * @param writerId  The partition ID of this writer.
+ * @param outputPartitioner The partitioner on the reader side of the 
shuffle.
+ * @param endpoints The [[RPCContinuousShuffleReader]] endpoints 
to write to. Indexed by
+ *  partition ID within outputPartitioner.
+ */
+class RPCContinuousShuffleWriter(
+writerId: Int,
+outputPartitioner: Partitioner,
+endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter {
+
+  if (outputPartitioner.numPartitions != 1) {
+throw new IllegalArgumentException("multiple readers not yet 
supported")
+  }
+
+  if (outputPartitioner.numPartitions != endpoints.length) {
+throw new IllegalArgumentException(s"partitioner size 
${outputPartitioner.numPartitions} did " +
+  s"not match endpoint count ${endpoints.length}")
+  }
+
+  def write(epoch: Iterator[UnsafeRow]): Unit = {
+while (epoch.hasNext) {
+  val row = epoch.next()
+  
endpoints(outputPartitioner.getPartition(row)).askSync[Unit](ReceiverRow(writerId,
 row))
+}
+
+endpoints.foreach(_.askSync[Unit](ReceiverEpochMarker(writerId)))
--- End diff --

Sure, but I don't think there's any benefit to doing so. We need to 
sequence the messages across epochs too, so there's little parallelization 
available that way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194950151
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.language.reflectiveCalls
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.StreamingQueryListener._
+
+
+class StreamingQueryListenersConfSuite extends StreamTest with 
BeforeAndAfter {
+
+  import testImplicits._
+
+
+  override protected def sparkConf: SparkConf =
+super.sparkConf.set("spark.sql.streamingQueryListeners",
--- End diff --

seems a mistake here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21529
  
I think for this PR, apart from the end-to-end test for checking result, we 
should also have a unit test in `PlannerSuite` to cover this case. In the 
followup, we can add the config support for the golden file tests and add more 
test cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21504
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91737/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21504
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21504
  
**[Test build #91737 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91737/testReport)**
 for PR 21504 at commit 
[`b87c90b`](https://github.com/apache/spark/commit/b87c90b6bb356e0faaf8230cb6b20fbcdd65c858).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r194948874
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3236,13 +3236,49 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark.
+   * Collect a Dataset as Arrow batches and serve stream to PySpark.
*/
   private[sql] def collectAsArrowToPython(): Array[Any] = {
+val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
+
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  val iter: Iterator[Array[Byte]] =
-toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable)
-  PythonRDD.serveIterator(iter, "serve-Arrow")
+  PythonRDD.serveToStream("serve-Arrow") { out =>
+val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
+val arrowBatchRdd = getArrowBatchRdd(plan)
+val numPartitions = arrowBatchRdd.partitions.length
+
+// Store collection results for worst case of 1 to N-1 partitions
+val results = new Array[Array[Array[Byte]]](numPartitions - 1)
+var lastIndex = -1  // index of last partition written
+
+// Handler to eagerly write partitions to Python in order
+def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
+  // If result is from next partition in order
+  if (index - 1 == lastIndex) {
+batchWriter.writeBatches(arrowBatches.iterator)
+lastIndex += 1
+// Write stored partitions that come next in order
+while (lastIndex < results.length && results(lastIndex) != 
null) {
+  batchWriter.writeBatches(results(lastIndex).iterator)
+  results(lastIndex) = null
+  lastIndex += 1
+}
+// After last batch, end the stream
+if (lastIndex == results.length) {
+  batchWriter.end()
+}
+  } else {
+// Store partitions received out of order
+results(index - 1) = arrowBatches
+  }
+}
+
+sparkSession.sparkContext.runJob(
+  arrowBatchRdd,
+  (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray,
+  0 until numPartitions,
+  handlePartitionBatches)
--- End diff --

Instead of collecting partitions back at once and holding out of order 
partitions in driver waiting for partitions in order, is it better to 
incrementally run job on partitions in order and send streams to python side? 
So we don't need to hold out of order partitions in driver.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r194949076
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala ---
@@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils {
   }
 
   /**
-   * Python Callable function to convert ArrowPayloads into a 
[[DataFrame]].
+   * Python callable function to convert an RDD of serialized 
ArrowRecordBatches into
+   * a [[DataFrame]].
*
-   * @param payloadRDD A JavaRDD of ArrowPayloads.
-   * @param schemaString JSON Formatted Schema for ArrowPayloads.
+   * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches.
+   * @param schemaString JSON Formatted Spark schema for Arrow batches.
* @param sqlContext The active [[SQLContext]].
* @return The converted [[DataFrame]].
*/
-  def arrowPayloadToDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  def arrowStreamToDataFrame(
+  arrowBatchRDD: JavaRDD[Array[Byte]],
   schemaString: String,
   sqlContext: SQLContext): DataFrame = {
-ArrowConverters.toDataFrame(payloadRDD, schemaString, sqlContext)
+ArrowConverters.toDataFrame(arrowBatchRDD, schemaString, sqlContext)
+  }
+
+  /**
+   * Python callable function to read a file in Arrow stream format and 
create a [[DataFrame]]
+   * using each serialized ArrowRecordBatch as a partition.
+   *
+   * @param sqlContext The active [[SQLContext]].
+   * @param filename File to read the Arrow stream from.
+   * @param schemaString JSON Formatted Spark schema for Arrow batches.
+   * @return A new [[DataFrame]].
+   */
+  def arrowReadStreamFromFile(
+  sqlContext: SQLContext,
+  filename: String,
+  schemaString: String): DataFrame = {
+JavaSparkContext.fromSparkContext(sqlContext.sparkContext)
--- End diff --

What is this line for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...

2018-06-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21537#discussion_r194946640
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 ---
@@ -1004,26 +1014,30 @@ case class Cast(child: Expression, dataType: 
DataType, timeZoneId: Option[String
   private[this] def castToIntervalCode(from: DataType): CastFunction = 
from match {
 case StringType =>
   (c, evPrim, evNull) =>
-s"""$evPrim = CalendarInterval.fromString($c.toString());
+code"""$evPrim = CalendarInterval.fromString($c.toString());
if(${evPrim} == null) {
  ${evNull} = true;
}
  """.stripMargin
 
   }
 
-  private[this] def decimalToTimestampCode(d: String): String =
-s"($d.toBigDecimal().bigDecimal().multiply(new 
java.math.BigDecimal(100L))).longValue()"
-  private[this] def longToTimeStampCode(l: String): String = s"$l * 
100L"
-  private[this] def timestampToIntegerCode(ts: String): String =
-s"java.lang.Math.floor((double) $ts / 100L)"
-  private[this] def timestampToDoubleCode(ts: String): String = s"$ts / 
100.0"
+  private[this] def decimalToTimestampCode(d: ExprValue): ExprValue =
+JavaCode.expression(
+  s"($d.toBigDecimal().bigDecimal().multiply(new 
java.math.BigDecimal(100L))).longValue()",
--- End diff --

hmm, here we should use `Block`. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21535: [SPARK-23596][SQL][WIP] Test interpreted path on Dataset...

2018-06-12 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/21535
  
@hvanhovell I observed the testing time, roughly for a same test, 
additional interpreted path will add 50~120% time.

Interpreted encoders? Do you mean to only test interpreted path for 
`org.apache.spark.sql.catalyst.encoders.*`?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21515: [SPARK-24372][build] Add scripts to help with preparing ...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21515
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21515: [SPARK-24372][build] Add scripts to help with preparing ...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21515
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91734/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21515: [SPARK-24372][build] Add scripts to help with preparing ...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21515
  
**[Test build #91734 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91734/testReport)**
 for PR 21515 at commit 
[`04f6371`](https://github.com/apache/spark/commit/04f6371a3aa0f03ba1c37b5b450bea69923388e9).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21539: [SPARK-24500][SQL] Make sure streams are materialized du...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21539
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91735/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21539: [SPARK-24500][SQL] Make sure streams are materialized du...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21539
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3961/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/72/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21539: [SPARK-24500][SQL] Make sure streams are materialized du...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21539
  
**[Test build #91735 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91735/testReport)**
 for PR 21539 at commit 
[`d5832f4`](https://github.com/apache/spark/commit/d5832f4b50d4c8f84feb462291d8da37e87b192f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21547
  
**[Test build #91739 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91739/testReport)**
 for PR 21547 at commit 
[`0f37c94`](https://github.com/apache/spark/commit/0f37c94046e8bc6c0c96cb150b0c7d8ad99f72a2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/71/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3960/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21547
  
**[Test build #91738 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91738/testReport)**
 for PR 21547 at commit 
[`9606670`](https://github.com/apache/spark/commit/96066701ec75d3caa27994c47eab8ff64150b6a5).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21547
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91738/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21547: [SPARK-24538][SQL] ByteArrayDecimalType support push dow...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21547
  
**[Test build #91738 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91738/testReport)**
 for PR 21547 at commit 
[`9606670`](https://github.com/apache/spark/commit/96066701ec75d3caa27994c47eab8ff64150b6a5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21547: [SPARK-24538][SQL] ByteArrayDecimalType support p...

2018-06-12 Thread wangyum
GitHub user wangyum opened a pull request:

https://github.com/apache/spark/pull/21547

[SPARK-24538][SQL] ByteArrayDecimalType support push down to the data 
sources

## What changes were proposed in this pull request?


[ByteArrayDecimalType](https://github.com/apache/spark/blob/e28eb431146bcdcaf02a6f6c406ca30920592a6a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala#L230)
 support push down to the data sources.

## How was this patch tested?
unit tests and manual tests.

**manual tests**:
```scala
spark.range(1000).selectExpr("id", "cast(id as decimal(9)) as d1", 
"cast(id as decimal(9, 2)) as d2", "cast(id as decimal(18)) as d3", "cast(id as 
decimal(18, 4)) as d4", "cast(id as decimal(38)) as d5", "cast(id as 
decimal(38, 18)) as d6").coalesce(1).write.option("parquet.block.size", 
1048576).parquet("/tmp/spark/parquet/decimal")
val df = spark.read.parquet("/tmp/spark/parquet/decimal/")
// Only read about 1 MB data
df.filter("d6 = 1").show
// Read 174.3 MB data
df.filter("d3 = 1").show
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangyum/spark SPARK-24538

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21547


commit 96066701ec75d3caa27994c47eab8ff64150b6a5
Author: Yuming Wang 
Date:   2018-06-13T01:35:55Z

ByteArrayDecimalType support push down to the data sources




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21546
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91733/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21546
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21546
  
**[Test build #91733 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91733/testReport)**
 for PR 21546 at commit 
[`a5a1fbe`](https://github.com/apache/spark/commit/a5a1fbe7121c5b0dd93876a56c29ad17dcd9b168).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-12 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21504
  
**[Test build #91737 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91737/testReport)**
 for PR 21504 at commit 
[`b87c90b`](https://github.com/apache/spark/commit/b87c90b6bb356e0faaf8230cb6b20fbcdd65c858).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   >