Repository: arrow Updated Branches: refs/heads/master be5d73f2c -> 0ae4d86e5
ARROW-497: Integration harness for streaming file format These tests pass locally for me. Thanks @nongli for this! Author: Nong Li <non...@gmail.com> Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #312 from wesm/streaming-integration and squashes the following commits: 8b9ad76 [Wes McKinney] Hook stream<->file tools together and get integration tests working. Quiet test output in TestArrowStreamPipe c7f0483 [Nong Li] ARROW-XXX: [Java] Add file <=> stream utility tools. Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/0ae4d86e Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/0ae4d86e Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/0ae4d86e Branch: refs/heads/master Commit: 0ae4d86e5ef8ee53a8810f4324dce80ec6a9d422 Parents: be5d73f Author: Nong Li <non...@gmail.com> Authored: Thu Feb 2 14:36:23 2017 +0100 Committer: Uwe L. Korn <uw...@xhochy.com> Committed: Thu Feb 2 14:36:23 2017 +0100 ---------------------------------------------------------------------- ci/travis_script_integration.sh | 3 + integration/integration_test.py | 76 ++++++++++++++++---- .../org/apache/arrow/tools/FileToStream.java | 68 ++++++++++++++++++ .../org/apache/arrow/tools/StreamToFile.java | 61 ++++++++++++++++ .../arrow/vector/stream/MessageSerializer.java | 2 +- .../vector/stream/TestArrowStreamPipe.java | 2 +- 6 files changed, 198 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/ci/travis_script_integration.sh ---------------------------------------------------------------------- diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh index d93411b..c019a4b 100755 --- a/ci/travis_script_integration.sh +++ b/ci/travis_script_integration.sh @@ -28,7 +28,10 @@ pushd $TRAVIS_BUILD_DIR/integration VERSION=0.1.1-SNAPSHOT export ARROW_JAVA_INTEGRATION_JAR=$JAVA_DIR/tools/target/arrow-tools-$VERSION-jar-with-dependencies.jar + export ARROW_CPP_TESTER=$CPP_BUILD_DIR/debug/json-integration-test +export ARROW_CPP_STREAM_TO_FILE=$CPP_BUILD_DIR/debug/stream-to-file +export ARROW_CPP_FILE_TO_STREAM=$CPP_BUILD_DIR/debug/file-to-stream source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh export MINICONDA=$HOME/miniconda http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/integration/integration_test.py ---------------------------------------------------------------------- diff --git a/integration/integration_test.py b/integration/integration_test.py index 77510da..a622bf2 100644 --- a/integration/integration_test.py +++ b/integration/integration_test.py @@ -556,12 +556,25 @@ class IntegrationRunner(object): consumer.name)) for json_path in self.json_files: - print('Testing with {0}'.format(json_path)) + print('Testing file {0}'.format(json_path)) - arrow_path = os.path.join(self.temp_dir, guid()) + # Make the random access file + print('-- Creating binary inputs') + producer_file_path = os.path.join(self.temp_dir, guid()) + producer.json_to_file(json_path, producer_file_path) - producer.json_to_arrow(json_path, arrow_path) - consumer.validate(json_path, arrow_path) + # Validate the file + print('-- Validating file') + consumer.validate(json_path, producer_file_path) + + print('-- Validating stream') + producer_stream_path = os.path.join(self.temp_dir, guid()) + consumer_file_path = os.path.join(self.temp_dir, guid()) + producer.file_to_stream(producer_file_path, + producer_stream_path) + consumer.stream_to_file(producer_stream_path, + consumer_file_path) + consumer.validate(json_path, consumer_file_path) class Tester(object): @@ -569,7 +582,13 @@ class Tester(object): def __init__(self, debug=False): self.debug = debug - def json_to_arrow(self, json_path, arrow_path): + def json_to_file(self, json_path, arrow_path): + raise NotImplementedError + + def stream_to_file(self, stream_path, file_path): + raise NotImplementedError + + def file_to_stream(self, file_path, stream_path): raise NotImplementedError def validate(self, json_path, arrow_path): @@ -601,21 +620,40 @@ class JavaTester(Tester): if self.debug: print(' '.join(cmd)) - return run_cmd(cmd) + run_cmd(cmd) def validate(self, json_path, arrow_path): return self._run(arrow_path, json_path, 'VALIDATE') - def json_to_arrow(self, json_path, arrow_path): + def json_to_file(self, json_path, arrow_path): return self._run(arrow_path, json_path, 'JSON_TO_ARROW') + def stream_to_file(self, stream_path, file_path): + cmd = ['java', '-cp', self.ARROW_TOOLS_JAR, + 'org.apache.arrow.tools.StreamToFile', + stream_path, file_path] + run_cmd(cmd) + + def file_to_stream(self, file_path, stream_path): + cmd = ['java', '-cp', self.ARROW_TOOLS_JAR, + 'org.apache.arrow.tools.FileToStream', + file_path, stream_path] + run_cmd(cmd) + class CPPTester(Tester): + BUILD_PATH = os.path.join(ARROW_HOME, 'cpp/test-build/debug') CPP_INTEGRATION_EXE = os.environ.get( - 'ARROW_CPP_TESTER', - os.path.join(ARROW_HOME, - 'cpp/test-build/debug/json-integration-test')) + 'ARROW_CPP_TESTER', os.path.join(BUILD_PATH, 'json-integration-test')) + + STREAM_TO_FILE = os.environ.get( + 'ARROW_CPP_STREAM_TO_FILE', + os.path.join(BUILD_PATH, 'stream-to-file')) + + FILE_TO_STREAM = os.environ.get( + 'ARROW_CPP_FILE_TO_STREAM', + os.path.join(BUILD_PATH, 'file-to-stream')) name = 'C++' @@ -633,14 +671,28 @@ class CPPTester(Tester): if self.debug: print(' '.join(cmd)) - return run_cmd(cmd) + run_cmd(cmd) def validate(self, json_path, arrow_path): return self._run(arrow_path, json_path, 'VALIDATE') - def json_to_arrow(self, json_path, arrow_path): + def json_to_file(self, json_path, arrow_path): return self._run(arrow_path, json_path, 'JSON_TO_ARROW') + def stream_to_file(self, stream_path, file_path): + cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path] + cmd = ' '.join(cmd) + if self.debug: + print(cmd) + os.system(cmd) + + def file_to_stream(self, file_path, stream_path): + cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path] + cmd = ' '.join(cmd) + if self.debug: + print(cmd) + os.system(cmd) + def get_static_json_files(): glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json') http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java new file mode 100644 index 0000000..ba6505c --- /dev/null +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.file.ArrowBlock; +import org.apache.arrow.vector.file.ArrowFooter; +import org.apache.arrow.vector.file.ArrowReader; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.stream.ArrowStreamWriter; + +/** + * Converts an Arrow file to an Arrow stream. The file should be specified as the + * first argument and the output is written to standard out. + */ +public class FileToStream { + public static void convert(FileInputStream in, OutputStream out) throws IOException { + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + try( + ArrowReader reader = new ArrowReader(in.getChannel(), allocator);) { + ArrowFooter footer = reader.readFooter(); + try ( + ArrowStreamWriter writer = new ArrowStreamWriter(out, footer.getSchema()); + ) { + for (ArrowBlock block: footer.getRecordBatches()) { + try (ArrowRecordBatch batch = reader.readRecordBatch(block)) { + writer.writeRecordBatch(batch); + } + } + } + } + } + + public static void main(String[] args) throws IOException { + if (args.length != 1 && args.length != 2) { + System.err.println("Usage: FileToStream <input file> [output file]"); + System.exit(1); + } + + FileInputStream in = new FileInputStream(new File(args[0])); + OutputStream out = args.length == 1 ? + System.out : new FileOutputStream(new File(args[1])); + + convert(in, out); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java ---------------------------------------------------------------------- diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java new file mode 100644 index 0000000..c8a5c89 --- /dev/null +++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.file.ArrowWriter; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.stream.ArrowStreamReader; + +/** + * Converts an Arrow stream to an Arrow file. + */ +public class StreamToFile { + public static void convert(InputStream in, OutputStream out) throws IOException { + BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) { + reader.init(); + try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), reader.getSchema());) { + while (true) { + ArrowRecordBatch batch = reader.nextRecordBatch(); + if (batch == null) break; + writer.writeRecordBatch(batch); + } + } + } + } + + public static void main(String[] args) throws IOException { + InputStream in = System.in; + OutputStream out = System.out; + if (args.length == 2) { + in = new FileInputStream(new File(args[0])); + out = new FileOutputStream(new File(args[1])); + } + convert(in, out); + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index 7ab740c..92df250 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -226,7 +226,7 @@ public class MessageSerializer { Message.startMessage(builder); Message.addHeaderType(builder, headerType); Message.addHeader(builder, headerOffset); - Message.addVersion(builder, MetadataVersion.V1); + Message.addVersion(builder, MetadataVersion.V2); Message.addBodyLength(builder, bodyLength); builder.finish(Message.endMessage(builder)); return builder.dataBuffer(); http://git-wip-us.apache.org/repos/asf/arrow/blob/0ae4d86e/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java index a0a7ffa..aa0b77e 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java @@ -113,7 +113,7 @@ public class TestArrowStreamPipe { // Starts up a producer and consumer thread to read/write batches. @Test public void pipeTest() throws IOException, InterruptedException { - int NUM_BATCHES = 1000; + int NUM_BATCHES = 10; Pipe pipe = Pipe.open(); WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink()); ReaderThread reader = new ReaderThread(pipe.source());