This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1c21bc4 [improve] Reduce the performance loss of additional buffer
expansion. (#143)
1c21bc4 is described below
commit 1c21bc46d971eaed7df24e0382329bbbae4ed4c3
Author: Chuang Li <[email protected]>
AuthorDate: Wed Oct 25 15:02:38 2023 +0800
[improve] Reduce the performance loss of additional buffer expansion. (#143)
---
.../doris/spark/load/RecordBatchInputStream.java | 88 +++++++---------------
1 file changed, 29 insertions(+), 59 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
index f70809b..a361c39 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
@@ -40,8 +40,6 @@ public class RecordBatchInputStream extends InputStream {
public static final Logger LOG =
LoggerFactory.getLogger(RecordBatchInputStream.class);
- private static final int DEFAULT_BUF_SIZE = 4096;
-
/**
* Load record batch
*/
@@ -55,7 +53,12 @@ public class RecordBatchInputStream extends InputStream {
/**
* record buffer
*/
- private ByteBuffer buffer = ByteBuffer.allocate(0);
+
+ private ByteBuffer lineBuf = ByteBuffer.allocate(0);;
+
+ private ByteBuffer delimBuf = ByteBuffer.allocate(0);
+
+ private final byte[] delim;
/**
* record count has been read
@@ -70,31 +73,42 @@ public class RecordBatchInputStream extends InputStream {
public RecordBatchInputStream(RecordBatch recordBatch, boolean
passThrough) {
this.recordBatch = recordBatch;
this.passThrough = passThrough;
+ this.delim = recordBatch.getDelim();
}
@Override
public int read() throws IOException {
try {
- if (buffer.remaining() == 0 && endOfBatch()) {
- return -1; // End of stream
+ if (lineBuf.remaining() == 0 && endOfBatch()) {
+ return -1;
+ }
+
+ if (delimBuf != null && delimBuf.remaining() > 0) {
+ return delimBuf.get() & 0xff;
}
} catch (DorisException e) {
throw new IOException(e);
}
- return buffer.get() & 0xFF;
+ return lineBuf.get() & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
- if (buffer.remaining() == 0 && endOfBatch()) {
- return -1; // End of stream
+ if (lineBuf.remaining() == 0 && endOfBatch()) {
+ return -1;
+ }
+
+ if (delimBuf != null && delimBuf.remaining() > 0) {
+ int bytesRead = Math.min(len, delimBuf.remaining());
+ delimBuf.get(b, off, bytesRead);
+ return bytesRead;
}
} catch (DorisException e) {
throw new IOException(e);
}
- int bytesRead = Math.min(len, buffer.remaining());
- buffer.get(b, off, bytesRead);
+ int bytesRead = Math.min(len, lineBuf.remaining());
+ lineBuf.get(b, off, bytesRead);
return bytesRead;
}
@@ -109,6 +123,7 @@ public class RecordBatchInputStream extends InputStream {
public boolean endOfBatch() throws DorisException {
Iterator<InternalRow> iterator = recordBatch.getIterator();
if (readCount >= recordBatch.getBatchSize() || !iterator.hasNext()) {
+ delimBuf = null;
return true;
}
readNext(iterator);
@@ -125,62 +140,18 @@ public class RecordBatchInputStream extends InputStream {
if (!iterator.hasNext()) {
throw new ShouldNeverHappenException();
}
- byte[] delim = recordBatch.getDelim();
byte[] rowBytes = rowToByte(iterator.next());
if (isFirst) {
- ensureCapacity(rowBytes.length);
- buffer.put(rowBytes);
- buffer.flip();
+ delimBuf = null;
+ lineBuf = ByteBuffer.wrap(rowBytes);
isFirst = false;
} else {
- ensureCapacity(delim.length + rowBytes.length);
- buffer.put(delim);
- buffer.put(rowBytes);
- buffer.flip();
+ delimBuf = ByteBuffer.wrap(delim);
+ lineBuf = ByteBuffer.wrap(rowBytes);
}
readCount++;
}
- /**
- * Check if the buffer has enough capacity.
- *
- * @param need required buffer space
- */
- private void ensureCapacity(int need) {
-
- int capacity = buffer.capacity();
-
- if (need <= capacity) {
- buffer.clear();
- return;
- }
-
- // need to extend
- int newCapacity = calculateNewCapacity(capacity, need);
- LOG.info("expand buffer, min cap: {}, now cap: {}, new cap: {}", need,
capacity, newCapacity);
- buffer = ByteBuffer.allocate(newCapacity);
-
- }
-
- /**
- * Calculate new capacity for buffer expansion.
- *
- * @param capacity current buffer capacity
- * @param minCapacity required min buffer space
- * @return new capacity
- */
- private int calculateNewCapacity(int capacity, int minCapacity) {
- int newCapacity = 0;
- if (capacity == 0) {
- newCapacity = DEFAULT_BUF_SIZE;
-
- }
- while (newCapacity < minCapacity) {
- newCapacity = newCapacity << 1;
- }
- return newCapacity;
- }
-
/**
* Convert Spark row data to byte array
*
@@ -220,5 +191,4 @@ public class RecordBatchInputStream extends InputStream {
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]