Hi,
I am trying out Arrow Flight and I have these two programs to write and read
from file.
public class WriteToBuffer {
public static void main(String[] args) {
WriteToBuffer wb = new WriteToBuffer();
wb.execute1();
}
public void execute1(){
try (BufferAllocator rootAllocator = new RootAllocator()) {
Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()),
null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32,
true)), null);
Schema schemaPerson = new Schema(asList(name, age));
try(
VectorSchemaRoot vectorSchemaRoot =
VectorSchemaRoot.create(schemaPerson, rootAllocator)
){
VarCharVector nameVector = (VarCharVector)
vectorSchemaRoot.getVector("name");
nameVector.allocateNew(3);
nameVector.set(0, "David".getBytes());
nameVector.set(1, "Gladis".getBytes());
nameVector.set(2, "Juan".getBytes());
IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
ageVector.allocateNew(3);
ageVector.set(0, 10);
ageVector.set(1, 20);
ageVector.set(2, 30);
vectorSchemaRoot.setRowCount(3);
File file = new File("streaming_to_file.arrow");
try (
FileOutputStream fileOutputStream = new FileOutputStream(file);
ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot,
null, fileOutputStream.getChannel())
){
writer.start();
System.out.println("Writing Batch");
writer.writeBatch();
System.out.println("Number of rows written: " +
vectorSchemaRoot.getRowCount());
writer.end();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public ArrowFileWriter execute() {
try (
BufferAllocator allocator = new RootAllocator()) {
Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()),
null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32,
true)), null);
Schema schemaPerson = new Schema(asList(name, age));
try (
VectorSchemaRoot vectorSchemaRoot =
VectorSchemaRoot.create(schemaPerson, allocator)
) {
VarCharVector nameVector = (VarCharVector)
vectorSchemaRoot.getVector("name");
nameVector.allocateNew(3);
nameVector.set(0, "David".getBytes());
nameVector.set(1, "Gladis".getBytes());
nameVector.set(2, "Juan".getBytes());
IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age");
ageVector.allocateNew(3);
ageVector.set(0, 10);
ageVector.set(1, 20);
ageVector.set(2, 30);
vectorSchemaRoot.setRowCount(3);
try (
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot,
null, Channels.newChannel(out))
) {
writer.start();
writer.writeBatch();
System.out.println("Record batches written: " +
writer.getRecordBlocks().size() +
". Number of rows written: " + vectorSchemaRoot.getRowCount());
return writer;
} catch (IOException e) {
e.printStackTrace();
}
}
}
return null;
}
}
public class ReadFromBuffer {
public static void main(String[] args) {
ReadFromBuffer rb = new ReadFromBuffer();
rb.execute1();
}
public void execute1(){
File file = new File("streaming_to_file.arrow");
try(
BufferAllocator rootAllocator = new RootAllocator();
FileInputStream fileInputStream = new FileInputStream(file);
ArrowFileReader reader = new
ArrowFileReader(fileInputStream.getChannel(), rootAllocator)
){
System.out.println("Record batches in file: " +
reader.getRecordBlocks().size());
for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock);
VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
System.out.print(vectorSchemaRootRecover.contentToTSVString());
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void execute() {
//Path path = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow");
Path path = Paths.get("streaming_to_file.arrow");
try (
BufferAllocator rootAllocator = new RootAllocator();
ArrowFileReader reader = new ArrowFileReader(new
SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(
Files.readAllBytes(path))), rootAllocator)
) {
//System.out.println("Record batches in file: " +
reader.getRecordBlocks().size());
while (reader.loadNextBatch()) {
for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock);
VectorSchemaRoot vectorSchemaRootRecover =
reader.getVectorSchemaRoot();
System.out.print(vectorSchemaRootRecover.contentToTSVString());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
The Write program successfully writes the arrow file but when I execute the
read program I get this error…
(PyGDev1) C02G35CWMD6R:scala vsitaraman$ java -cp
./jars/chapter2-assembly-1.0.jar main.java.chapter2.ReadFromBuffer
ERROR StatusLogger Log4j2 could not find a logging implementation. Please add
log4j-core to the classpath. Using SimpleLogger to log to the console...
Exception in thread "main"
org.apache.arrow.vector.ipc.InvalidArrowFileException: missing Magic number [0,
0, -1, -1, -1, -1, 0, 0, 0, 0]
at
org.apache.arrow.vector.ipc.ArrowFileReader.readSchema(ArrowFileReader.java:98)
at
org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:185)
at
org.apache.arrow.vector.ipc.ArrowFileReader.initialize(ArrowFileReader.java:120)
at
org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:176)
at
org.apache.arrow.vector.ipc.ArrowFileReader.getRecordBlocks(ArrowFileReader.java:183)
at main.java.chapter2.ReadFromBuffer.execute1(ReadFromBuffer.java:32)
at main.java.chapter2.ReadFromBuffer.main(ReadFromBuffer.java:22)
The arrow file that was written is as follows:
00000000: ffff ffff c800 0000 1000 0000 0000 0a00 ................
00000010: 0e00 0600 0d00 0800 0a00 0000 0000 0400 ................
00000020: 1000 0000 0001 0a00 0c00 0000 0800 0400 ................
00000030: 0a00 0000 0800 0000 0800 0000 0000 0000 ................
00000040: 0200 0000 5800 0000 0400 0000 c2ff ffff ....X...........
00000050: 1400 0000 1400 0000 1c00 0000 0000 0201 ................
00000060: 2000 0000 0000 0000 0000 0000 0800 0c00 ...............
00000070: 0800 0700 0800 0000 0000 0001 2000 0000 ............ ...
00000080: 0300 0000 6167 6500 0000 1200 1800 1400 ....age.........
00000090: 1300 1200 0c00 0000 0800 0400 1200 0000 ................
000000a0: 1400 0000 1400 0000 1800 0000 0000 0501 ................
000000b0: 1400 0000 0000 0000 0000 0000 0400 0400 ................
000000c0: 0400 0000 0400 0000 6e61 6d65 0000 0000 ........name....
000000d0: ffff ffff c800 0000 1400 0000 0000 0000 ................
000000e0: 0c00 1600 0e00 1500 1000 0400 0c00 0000 ................
8 lines filtered
What could I be missing. Thanks
Sitaraman