Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21440#discussion_r191182696
  
    --- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
    @@ -0,0 +1,154 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.io
    +
    +import java.nio.ByteBuffer
    +import java.nio.channels.WritableByteChannel
    +
    +import scala.util.Random
    +
    +import org.mockito.Mockito.when
    +import org.scalatest.BeforeAndAfterEach
    +import org.scalatest.mockito.MockitoSugar
    +
    +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
    +import org.apache.spark.internal.config
    +import org.apache.spark.util.io.ChunkedByteBuffer
    +
    +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
    +    with BeforeAndAfterEach {
    +
    +  override protected def beforeEach(): Unit = {
    +    super.beforeEach()
    +    val conf = new SparkConf()
    +    val env = mock[SparkEnv]
    +    SparkEnv.set(env)
    +    when(env.conf).thenReturn(conf)
    +  }
    +
    +  override protected def afterEach(): Unit = {
    +    SparkEnv.set(null)
    +  }
    +
    +  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
    +    val bytes = (0 until nChunks).map { chunkIdx =>
    +      val bb = ByteBuffer.allocate(perChunk)
    +      (0 until perChunk).foreach { idx =>
    +        bb.put((chunkIdx * perChunk + idx).toByte)
    +      }
    +      bb.position(0)
    +      bb
    +    }.toArray
    +    new ChunkedByteBuffer(bytes)
    +  }
    +
    +  test("transferTo can stop and resume correctly") {
    +    SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
    +    val cbb = generateChunkByteBuffer(4, 10)
    +    val fileRegion = cbb.toNetty
    +
    +    val targetChannel = new LimitedWritableByteChannel(40)
    +
    +    var pos = 0L
    +    // write the fileregion to the channel, but with the transfer limited 
at various spots along
    +    // the way.
    +
    +    // limit to within the first chunk
    +    targetChannel.acceptNBytes = 5
    +    pos = fileRegion.transferTo(targetChannel, pos)
    +    assert(targetChannel.pos === 5)
    +
    +    // a little bit further within the first chunk
    +    targetChannel.acceptNBytes = 2
    +    pos += fileRegion.transferTo(targetChannel, pos)
    +    assert(targetChannel.pos === 7)
    +
    +    // past the first chunk, into the 2nd
    +    targetChannel.acceptNBytes = 6
    +    pos += fileRegion.transferTo(targetChannel, pos)
    +    assert(targetChannel.pos === 13)
    +
    +    // right to the end of the 2nd chunk
    +    targetChannel.acceptNBytes = 7
    +    pos += fileRegion.transferTo(targetChannel, pos)
    +    assert(targetChannel.pos === 20)
    +
    +    // rest of 2nd chunk, all of 3rd, some of 4th
    +    targetChannel.acceptNBytes = 15
    +    pos += fileRegion.transferTo(targetChannel, pos)
    +    assert(targetChannel.pos === 35)
    +
    +    // now till the end
    +    targetChannel.acceptNBytes = 5
    +    pos += fileRegion.transferTo(targetChannel, pos)
    +    assert(targetChannel.pos === 40)
    +
    +    // calling again at the end should be OK
    +    targetChannel.acceptNBytes = 20
    +    fileRegion.transferTo(targetChannel, pos)
    +    assert(targetChannel.pos === 40)
    +  }
    +
    +  test(s"transfer to with random limits") {
    +    val rng = new Random()
    +    val seed = System.currentTimeMillis()
    +    logInfo(s"seed = $seed")
    +    rng.setSeed(seed)
    +    val chunkSize = 1e4.toInt
    +    SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
    +
    +    val cbb = generateChunkByteBuffer(50, chunkSize)
    +    val fileRegion = cbb.toNetty
    +    val transferLimit = 1e5.toInt
    +    val targetChannel = new LimitedWritableByteChannel(transferLimit)
    +    while (targetChannel.pos < cbb.size) {
    +      val nextTransferSize = rng.nextInt(transferLimit)
    +      targetChannel.acceptNBytes = nextTransferSize
    +      fileRegion.transferTo(targetChannel, targetChannel.pos)
    +    }
    +    assert(0 === fileRegion.transferTo(targetChannel, targetChannel.pos))
    +  }
    +
    +  /**
    +   * This mocks a channel which only accepts a limited number of bytes at 
a time.  It also verifies
    +   * the written data matches our expectations as the data is received.
    +   * @param maxWriteSize
    +   */
    +  private class LimitedWritableByteChannel(maxWriteSize: Int) extends 
WritableByteChannel {
    +    val bytes = new Array[Byte](maxWriteSize)
    +    var acceptNBytes = 0
    +    var pos = 0
    +
    +    override def write(src: ByteBuffer): Int = {
    +      val origSrcPos = src.position()
    +      val length = math.min(acceptNBytes, src.remaining())
    +      src.get(bytes, 0, length)
    --- End diff --
    
    We override 'bytes' array's previously written data ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to