Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21546#discussion_r212171980
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
    @@ -183,34 +178,106 @@ private[sql] object ArrowConverters {
       }
     
       /**
    -   * Convert a byte array to an ArrowRecordBatch.
    +   * Load a serialized ArrowRecordBatch.
        */
    -  private[arrow] def byteArrayToBatch(
    +  private[arrow] def loadBatch(
           batchBytes: Array[Byte],
           allocator: BufferAllocator): ArrowRecordBatch = {
    -    val in = new ByteArrayReadableSeekableByteChannel(batchBytes)
    -    val reader = new ArrowFileReader(in, allocator)
    -
    -    // Read a batch from a byte stream, ensure the reader is closed
    -    Utils.tryWithSafeFinally {
    -      val root = reader.getVectorSchemaRoot  // throws IOException
    -      val unloader = new VectorUnloader(root)
    -      reader.loadNextBatch()  // throws IOException
    -      unloader.getRecordBatch
    -    } {
    -      reader.close()
    -    }
    +    val in = new ByteArrayInputStream(batchBytes)
    +    MessageSerializer.deserializeRecordBatch(
    +      new ReadChannel(Channels.newChannel(in)), allocator)  // throws 
IOException
       }
     
    +  /**
    +   * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches.
    +   */
       private[sql] def toDataFrame(
    -      payloadRDD: JavaRDD[Array[Byte]],
    +      arrowBatchRDD: JavaRDD[Array[Byte]],
           schemaString: String,
           sqlContext: SQLContext): DataFrame = {
    -    val rdd = payloadRDD.rdd.mapPartitions { iter =>
    +    val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
    +    val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone
    +    val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
           val context = TaskContext.get()
    -      ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
    +      ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
         }
    -    val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
         sqlContext.internalCreateDataFrame(rdd, schema)
       }
    +
    +  /**
    +   * Read a file as an Arrow stream and parallelize as an RDD of 
serialized ArrowRecordBatches.
    +   */
    +  private[sql] def readArrowStreamFromFile(
    +      sqlContext: SQLContext,
    +      filename: String): JavaRDD[Array[Byte]] = {
    +    val fileStream = new FileInputStream(filename)
    +    try {
    +      // Create array so that we can safely close the file
    +      val batches = getBatchesFromStream(fileStream.getChannel).toArray
    +      // Parallelize the record batches to create an RDD
    +      JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, 
batches.length))
    --- End diff --
    
    so this the length of the array of batches, not the number of records in 
the batch.  The input is split according to the default parallelism config.  So 
if that is 32, we will have an array of 32 batches and then parallelize those 
to 32 partitions. `parallelize` might usually have one big array of primitives 
as the first arg, that you then partition by the number in the second arg, but 
this is a little different since we are using batches. Does that answer your 
question?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to