[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-10 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105466801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
--- End diff --

Yes. Feel free to send a PR to Arrow for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105323172
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableSeekableByteChannel(payloadBytes)
+  val reader = new ArrowReader(in, _allocator)
+  val footer = reader.readFooter()
+  val batchBlocks = footer.getRecordBatches.asScala.toArray
+  batchBlocks.foreach(block => batches += 
re

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105322928
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableSeekableByteChannel(payloadBytes)
+  val reader = new ArrowReader(in, _allocator)
+  val footer = reader.readFooter()
+  val batchBlocks = footer.getRecordBatches.asScala.toArray
+  batchBlocks.foreach(block => batches += 
re

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105324269
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ArrowConvertersSuite.scala ---
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, Timestamp}
+import java.text.SimpleDateFormat
+import java.util.Locale
+
+import com.google.common.io.Files
+import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot}
+import org.apache.arrow.vector.file.json.JsonFileReader
+import org.apache.arrow.vector.util.Validator
+import org.json4s.jackson.JsonMethods._
+import org.json4s.JsonAST._
+import org.json4s.JsonDSL._
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+
+class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll 
{
+  import testImplicits._
+
+  private var tempDataPath: String = _
+
+  private def collectAsArrow(df: DataFrame,
+ converter: Option[ArrowConverters] = None): 
ArrowPayload = {
+val cnvtr = converter.getOrElse(new ArrowConverters)
+val payloadByteArrays = df.toArrowPayloadBytes().collect()
+cnvtr.readPayloadByteArrays(payloadByteArrays)
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+tempDataPath = Utils.createTempDir(namePrefix = 
"arrow").getAbsolutePath
+  }
+
+  test("collect to arrow record batch") {
+val indexData = (1 to 6).toDF("i")
+val arrowPayload = collectAsArrow(indexData)
+assert(arrowPayload.nonEmpty)
+val arrowBatches = arrowPayload.toArray
+assert(arrowBatches.length == indexData.rdd.getNumPartitions)
+val rowCount = arrowBatches.map(batch => batch.getLength).sum
+assert(rowCount === indexData.count())
+arrowBatches.foreach(batch => assert(batch.getNodes.size() > 0))
+arrowBatches.foreach(batch => batch.close())
+  }
+
+  test("numeric type conversion") {
+collectAndValidate(indexData)
+collectAndValidate(shortData)
+collectAndValidate(intData)
+collectAndValidate(longData)
+collectAndValidate(floatData)
+collectAndValidate(doubleData)
+  }
+
+  test("mixed numeric type conversion") {
+collectAndValidate(mixedNumericData)
+  }
+
+  test("boolean type conversion") {
+collectAndValidate(boolData)
+  }
+
+  test("string type conversion") {
+collectAndValidate(stringData)
+  }
+
+  test("byte type conversion") {
+collectAndValidate(byteData)
+  }
+
+  test("timestamp conversion") {
+collectAndValidate(timestampData)
+  }
+
+  // TODO: Not currently supported in Arrow JSON reader
+  ignore("date conversion") {
+// collectAndValidate(dateTimeData)
+  }
+
+  // TODO: Not currently supported in Arrow JSON reader
+  ignore("binary type conversion") {
+// collectAndValidate(binaryData)
+  }
+
+  test("floating-point NaN") {
+collectAndValidate(floatNaNData)
+  }
+
+  test("partitioned DataFrame") {
+val converter = new ArrowConverters
+val schema = testData2.schema
+val arrowPayload = collectAsArrow(testData2, Some(converter))
+val arrowBatches = arrowPayload.toArray
+// NOTE: testData2 should have 2 partitions -> 2 arrow batches in 
payload
+assert(arrowBatches.length === 2)
+val pl1 = new ArrowStaticPayload(arrowBatches(0))
+val pl2 = new ArrowStaticP

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105321731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
--- End diff --

if we create an allocator we should have a way to close it in the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105322470
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableSeekableByteChannel(payloadBytes)
+  val reader = new ArrowReader(in, _allocator)
+  val footer = reader.readFooter()
+  val batchBlocks = footer.getRecordBatches.asScala.toArray
+  batchBlocks.foreach(block => batches += 
re

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105322707
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableSeekableByteChannel(payloadBytes)
+  val reader = new ArrowReader(in, _allocator)
+  val footer = reader.readFooter()
+  val batchBlocks = footer.getRecordBatches.asScala.toArray
+  batchBlocks.foreach(block => batches += 
re

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105322098
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableSeekableByteChannel(payloadBytes)
+  val reader = new ArrowReader(in, _allocator)
+  val footer = reader.readFooter()
+  val batchBlocks = footer.getRecordBatches.asScala.toArray
+  batchBlocks.foreach(block => batches += 
re

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105322283
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableSeekableByteChannel(payloadBytes)
+  val reader = new ArrowReader(in, _allocator)
+  val footer = reader.readFooter()
+  val batchBlocks = footer.getRecordBatches.asScala.toArray
+  batchBlocks.foreach(block => batches += 
re

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105321652
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+
+  private[sql] def allocator: RootAllocator = _allocator
+
+  def interalRowIterToPayload(rowIter: Iterator[InternalRow], schema: 
StructType): ArrowPayload = {
+val batch = ArrowConverters.internalRowIterToArrowBatch(rowIter, 
schema, allocator)
+new ArrowStaticPayload(batch)
+  }
+
+  def readPayloadByteArrays(payloadByteArrays: Array[Array[Byte]]): 
ArrowPayload = {
+val batches = 
scala.collection.mutable.ArrayBuffer.empty[ArrowRecordBatch]
+var i = 0
+while (i < payloadByteArrays.length) {
+  val payloadBytes = payloadByteArrays(i)
+  val in = new ByteArrayReadableSeekableByteChannel(payloadBytes)
+  val reader = new ArrowReader(in, _allocator)
+  val footer = reader.readFooter()
+  val batchBlocks = footer.getRecordBatches.asScala.toArray
+  batchBlocks.foreach(block => batches += 
re

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105321141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
--- End diff --

isn't length an Int already?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-03-09 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r105324023
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ArrowConverters.scala ---
@@ -0,0 +1,411 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, SeekableByteChannel}
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BaseAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file.{ArrowReader, ArrowWriter}
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * ArrowReader requires a seekable byte channel.
+ * NOTE - this is taken from test org.apache.vector.file, see about moving 
to public util pkg
+ */
+private[sql] class ByteArrayReadableSeekableByteChannel(var byteArray: 
Array[Byte])
+  extends SeekableByteChannel {
+  var _position: Long = 0L
+
+  override def isOpen: Boolean = {
+byteArray != null
+  }
+
+  override def close(): Unit = {
+byteArray = null
+  }
+
+  override def read(dst: ByteBuffer): Int = {
+val remainingBuf = byteArray.length - _position
+val length = Math.min(dst.remaining(), remainingBuf).toInt
+dst.put(byteArray, _position.toInt, length)
+_position += length
+length.toInt
+  }
+
+  override def position(): Long = _position
+
+  override def position(newPosition: Long): SeekableByteChannel = {
+_position = newPosition.toLong
+this
+  }
+
+  override def size: Long = {
+byteArray.length.toLong
+  }
+
+  override def write(src: ByteBuffer): Int = {
+throw new UnsupportedOperationException("Read Only")
+  }
+
+  override def truncate(size: Long): SeekableByteChannel = {
+throw new UnsupportedOperationException("Read Only")
+  }
+}
+
+/**
+ * Intermediate data structure returned from Arrow conversions
+ */
+private[sql] abstract class ArrowPayload extends Iterator[ArrowRecordBatch]
+
+/**
+ * Build a payload from existing ArrowRecordBatches
+ */
+private[sql] class ArrowStaticPayload(batches: ArrowRecordBatch*) extends 
ArrowPayload {
+  private val iter = batches.iterator
+  override def next(): ArrowRecordBatch = iter.next()
+  override def hasNext: Boolean = iter.hasNext
+}
+
+/**
+ * Class that wraps an Arrow RootAllocator used in conversion
+ */
+private[sql] class ArrowConverters {
+  private val _allocator = new RootAllocator(Long.MaxValue)
--- End diff --

the allocator should probably passed from where it is centrally initialized


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16281: [SPARK-13127][SQL] Update Parquet to 1.9.0

2017-01-27 Thread julienledem
Github user julienledem commented on the issue:

https://github.com/apache/spark/pull/16281
  
FYI: Parquet 1.8.2 vote thread passed: 
https://mail-archives.apache.org/mod_mbox/parquet-dev/201701.mbox/%3CCAO4re1mHLT%2BLYn8s1RTEDZK8-9WSVugY8-HQqAN%2BtU%3DBOi1L9w%40mail.gmail.com%3E


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16281: [SPARK-13127][SQL] Update Parquet to 1.9.0

2016-12-15 Thread julienledem
Github user julienledem commented on the issue:

https://github.com/apache/spark/pull/16281
  
Like @rdblue said, I don't recall people asking for a 1.8.2 on the parquet 
dev list.
We are happy to help and if there is a 1.8.x patch release branch it's 
better to maintain it in the parquet repo. 
We can create that branch and will be happy to accept your patches for 
small bug fixes there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-18 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45236894
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45136303
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45135755
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45134955
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.values.ValuesReader;
+import 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.*;
+
+import static 
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+/**
+ * Base class for custom RecordReaaders for Parquet that directly 
materialize to `T`.
+ * This class handles computing row groups, filtering on them, setting up 
the column readers,
+ * etc.
+ * TODO: move this to the parquet-mr project. There are performance 
benefits of doing it
+ * this way, albeit at a higher cost to implement.
--- End diff --

This makes sense to me. We should have this in 
https://github.com/apache/parquet-mr/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45136501
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45136485
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45135353
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.values.ValuesReader;
+import 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.*;
+
+import static 
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+/**
+ * Base class for custom RecordReaaders for Parquet that directly 
materialize to `T`.
+ * This class handles computing row groups, filtering on them, setting up 
the column readers,
+ * etc.
+ * TODO: move this to the parquet-mr project. There are performance 
benefits of doing it
+ * this way, albeit at a higher cost to implement.
+ */
+public abstract class SpecificParquetRecordReaderBase extends 
RecordReader<Void, T> {
+  protected Path file;
+  protected MessageType fileSchema;
+  protected MessageType requestedSchema;
+  protected ReadSupport readSupport;
+
+  /**
+   * The total number of rows this RecordReader will eventually read. The 
sum of the
+   * rows of all the row groups.
+   */
+  protected long totalRowCount;
+
+  protected ParquetFileReader reader;
+
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+Configuration configuration = taskAttemptContext.getConfiguration();
+ParquetInputSplit split = (ParquetInputSplit)inputSplit;
+this.file = split.getPath();
+long[] rowGroupOffsets = split.getRowGroupOffsets();
+
+ParquetMetadata footer;
+List blocks;
+
+// if task.side.metadata is set, rowGroupOffsets is null
+if (rowGroupOffsets == null) {
+  // then we need to apply the predicate push down filter
+  footer = readFooter(configuration, file, range(split.getStart(), 
split.getEnd()));
+  MessageType fileSchema = footer.getFileMetaData().getSchema();
+  FilterCompat.Filter filter = getFilter(configuration);
+  blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+} else {
+  // otherwise we find the row groups that were selected on the client

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45136171
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45136577
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45136673
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45162142
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: [WIP] [SPARK-11787] [SQL] Improve Parquet scan...

2015-11-17 Thread julienledem
Github user julienledem commented on a diff in the pull request:

https://github.com/apache/spark/pull/9774#discussion_r45162171
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 ---
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.unsafe.Platform;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A specialized RecordReader that reads into UnsafeRows directly using 
the Parquet column APIs.
+ *
+ * TODO: handle complex types.
+ */
+public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase {
+  /**
+   * Batch of unsafe rows that we assemble and the current index we've 
returned. Everytime this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private UnsafeRow[] rows = new UnsafeRow[64];
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   * columnsReaders[i] populated the UnsafeRow's attribute at i.
+   */
+  private ColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+  throws IOException, InterruptedException {
+super.initialize(inputSplit, taskAttemptContext);
+
+int rowByteSize = 
UnsafeRow.calculateBitSetWidthInBytes(requestedSchema.getFieldCount());
+rowByteSize += 8 * requestedSchema.getFieldCount();
+
+byte[] buffer = new byte[rowByteSize * rows.length];
+for (int i = 0; i < rows.length; ++i) {
+  rows[i] = new UnsafeRow();
+  rows[i].pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * rowByteSize,
+  requestedSchema.getFieldCount(), rowByteSize);
+}
+
+/**
+ * Check that the requested schema is supported.
+ */
+for (Type t: requestedSchema.getFields()) {
+  if (!t.isPrimitive()) throw new IOException("Unsupported schema");
+}
+
+// TODO: this needs to handle the schema resolution rules
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (batchIdx >= numBatched) {
+  if (!loadBatch()) return false;
+}
+++batchIdx;
+return true;
+  }
+
+  @Override
+  public UnsafeRow getCurrentValue() throws IOException, 
InterruptedException {
+return rows[batchIdx - 1];
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Decodes a batch of values into `rows`. This function is the hot path.
+   */
+  private bo

[GitHub] spark pull request: Fix SPARK-1413: Parquet messes up stdout and s...

2014-04-09 Thread julienledem
Github user julienledem commented on the pull request:

https://github.com/apache/spark/pull/325#issuecomment-40033694
  
@pwendell I'm perfectly happy with removing this static initialization in 
parquet:

https://github.com/Parquet/parquet-mr/blob/41df19051825d724626e91425c8e690c04a39998/parquet-common/src/main/java/parquet/Log.java#L53
suggestions on how to do that better are welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---