I think Plasma is a generic object store, so you could potentially serialize all the data to a IPC stream file and store that.
On Tue, Feb 23, 2021 at 12:46 AM 鲁立君 <[email protected]> wrote: > Hi, > > I am an beginner of Arrow and Plasma, I am trying to use them in my > project to improve performance. > > I will slice the origin VectorSchemaRoot and get many > sliced VectorSchemaRoot in my project, and then I will store these > VectorSchemaRoot into Plasma and the objectIds are the output of my task. > > Now I have a problem, for example, If I need to sort multiple > VectorSchemaRoot globally and output it sequentially, my solution is to > build a secondary index, sort it, and slice the raw data then output. So I > will get a lot of VectorSchemaRoot and a lot of objectId. This puts a lot > of pressure on my metadata store. > > I have a preliminary understanding of the principle of using Plasma to > store ArrowRecordBatch, I think it's a process of serializing > ArrowRecordBatch and deserializing it after reading it. However, the > serialization and deserialization utility classes currently provided by > Arrow only allow storing or reading a single ArrowRecordBatch. > > Here is my sample code > > // store a recordBatchList and get a list of objectId > > public List<String> storeRecordBatch(Schema schema, List<ArrowRecordBatch> > recordBatchList) throws IOException { > List<String> result = new ArrayList<>(); > for (ArrowRecordBatch recordBatch : recordBatchList) { > ByteArrayOutputStream byteArrayOutputStream = new > ByteArrayOutputStream(); > WritableByteChannel writableByteChannel = > Channels.newChannel(byteArrayOutputStream); > WriteChannel writeChannel = new WriteChannel(writableByteChannel); > MessageSerializer.serialize(writeChannel, schema); > MessageSerializer.serialize(writeChannel, recordBatch); > > byte[] id = new byte[20]; > do { > new Random().nextBytes(id); > } while (plasmaClient.contains(id)); > > plasmaClient.put(id, byteArrayOutputStream.toByteArray(), null); > assert plasmaClient.contains(id); > String objectId = ByteBufUtil.hexDump(id); > result.add(objectId); > } > return result; > } > > > // get a list of ArrowRecordBatch with a list of objectId > > public List<ArrowRecordBatch> getRecordBatches(List<String> blockIdList, > BufferAllocator allocator) throws IOException { > > List<ArrowRecordBatch> recordBatchList = new ArrayList<>(); > Schema schema = null; > for (String blockIdStr : blockIdList) { > byte[] blockId = ByteBufUtil.decodeHexDump(blockIdStr); > if (!plasmaClient.contains(blockId)) { > throw new > CIMemStoreException(CiMmeStoreResultEnum.BlockNotFoundException); > } > ByteBuffer buffer = plasmaClient.getObjAsByteBuffer(blockId, 0, > false); > byte[] bytes = new byte[buffer.capacity()]; > buffer.get(bytes); > plasmaClient.release(blockId); > ByteArrayInputStream byteArrayInputStream = new > ByteArrayInputStream(bytes); > ReadableByteChannel readableByteChannel = > Channels.newChannel(byteArrayInputStream); > ReadChannel readChannel = new ReadChannel(readableByteChannel); > schema = MessageSerializer.deserializeSchema(readChannel); > ArrowRecordBatch arrowRecordBatch = > MessageSerializer.deserializeRecordBatch(readChannel, allocator); > recordBatchList.add(arrowRecordBatch); > > } > return recordBatchList; > } > > > Is there a way to put a list of ArrowRecordBatch to Plasma with one singe > id? > > > > > > >
