This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 148213e ARROW-3162: Flight Python bindings
148213e is described below
commit 148213e3befb02ae345cf5f62cb40d7f8720178a
Author: David Li <[email protected]>
AuthorDate: Thu Feb 14 18:16:56 2019 +0100
ARROW-3162: Flight Python bindings
- [ ] Add docs
- [ ] Format code
- [ ] Include Python in integration tests (requires binding the JSON
reader/writer from C++)
- [ ] Validate performance?
- [ ] Complete server bindings if approach makes sense
Author: David Li <[email protected]>
Author: Wes McKinney <[email protected]>
Closes #3566 from lihalite/flight-python and squashes the following commits:
ac29ab88 <David Li> Clean up to-be-implemented parts of Flight Python
bindings
9d5442a0 <David Li> Clarify various RecordBatchStream{Reader,Writer}
wrappers
e1c298ad <David Li> Lint CMake files
77644447 <Wes McKinney> Reformat cmake
c6b02aa9 <David Li> Add basic Python bindings for Flight
---
cpp/cmake_modules/FindArrow.cmake | 14 +
cpp/src/arrow/flight/CMakeLists.txt | 2 +-
cpp/src/arrow/python/CMakeLists.txt | 13 +
cpp/src/arrow/python/flight.cc | 86 +++++
cpp/src/arrow/python/flight.h | 79 +++++
python/CMakeLists.txt | 25 ++
python/examples/flight/client.py | 139 ++++++++
python/examples/flight/server.py | 56 ++++
python/pyarrow/_flight.pyx | 476 ++++++++++++++++++++++++++++
python/pyarrow/flight.py | 20 ++
python/pyarrow/includes/common.pxd | 8 +-
python/pyarrow/includes/libarrow_flight.pxd | 142 +++++++++
python/pyarrow/ipc.pxi | 94 ++++--
python/pyarrow/ipc.py | 4 +-
python/pyarrow/lib.pxd | 10 +
python/requirements.txt | 1 +
python/setup.py | 13 +-
17 files changed, 1141 insertions(+), 41 deletions(-)
diff --git a/cpp/cmake_modules/FindArrow.cmake
b/cpp/cmake_modules/FindArrow.cmake
index f4b0a81..3d5fd8e 100644
--- a/cpp/cmake_modules/FindArrow.cmake
+++ b/cpp/cmake_modules/FindArrow.cmake
@@ -79,6 +79,14 @@ find_library(ARROW_PYTHON_LIB_PATH NAMES arrow_python
NO_DEFAULT_PATH)
get_filename_component(ARROW_PYTHON_LIBS ${ARROW_PYTHON_LIB_PATH} DIRECTORY)
+if (PYARROW_BUILD_FLIGHT)
+ find_library(ARROW_FLIGHT_LIB_PATH NAMES arrow_flight
+ PATHS
+ ${ARROW_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+ get_filename_component(ARROW_FLIGHT_LIBS ${ARROW_FLIGHT_LIB_PATH} DIRECTORY)
+endif()
+
if (MSVC)
SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll")
@@ -101,19 +109,25 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIBS)
set(ARROW_FOUND TRUE)
set(ARROW_LIB_NAME arrow)
set(ARROW_PYTHON_LIB_NAME arrow_python)
+ set(ARROW_FLIGHT_LIB_NAME arrow_flight)
if (MSVC)
set(ARROW_STATIC_LIB
${ARROW_LIBS}/${ARROW_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
set(ARROW_PYTHON_STATIC_LIB
${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
+ set(ARROW_FLIGHT_STATIC_LIB
${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
set(ARROW_SHARED_LIB
${ARROW_SHARED_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_PYTHON_SHARED_LIB
${ARROW_PYTHON_SHARED_LIBS}/${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+ set(ARROW_FLIGHT_SHARED_LIB
${ARROW_FLIGHT_SHARED_LIBS}/${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_SHARED_IMP_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}.lib)
set(ARROW_PYTHON_SHARED_IMP_LIB
${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}.lib)
+ set(ARROW_FLIGHT_SHARED_IMP_LIB
${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}.lib)
else()
set(ARROW_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}.a)
set(ARROW_PYTHON_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}.a)
+ set(ARROW_FLIGHT_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}.a)
set(ARROW_SHARED_LIB
${ARROW_LIBS}/lib${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_PYTHON_SHARED_LIB
${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+ set(ARROW_FLIGHT_SHARED_LIB
${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
endif()
endif()
diff --git a/cpp/src/arrow/flight/CMakeLists.txt
b/cpp/src/arrow/flight/CMakeLists.txt
index 9183e26..a51f4fe 100644
--- a/cpp/src/arrow/flight/CMakeLists.txt
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -21,7 +21,7 @@ add_custom_target(arrow_flight)
arrow_install_all_headers("arrow/flight")
set(ARROW_FLIGHT_STATIC_LINK_LIBS
- protobuf_static
+ ${PROTOBUF_LIBRARY}
grpc_grpcpp_static
grpc_grpc_static
grpc_gpr_static
diff --git a/cpp/src/arrow/python/CMakeLists.txt
b/cpp/src/arrow/python/CMakeLists.txt
index 93dbd66..9cf7eeb 100644
--- a/cpp/src/arrow/python/CMakeLists.txt
+++ b/cpp/src/arrow/python/CMakeLists.txt
@@ -44,12 +44,25 @@ set(ARROW_PYTHON_SRCS
pyarrow.cc
serialize.cc)
+if(ARROW_FLIGHT)
+ set(ARROW_PYTHON_SRCS ${ARROW_PYTHON_SRCS} flight.cc)
+endif()
+
if("${COMPILER_FAMILY}" STREQUAL "clang")
set_property(SOURCE pyarrow.cc APPEND_STRING PROPERTY COMPILE_FLAGS "
-Wno-cast-qual ")
endif()
set(ARROW_PYTHON_SHARED_LINK_LIBS arrow_shared ${PYTHON_OTHER_LIBS})
+if(ARROW_FLIGHT)
+ # Must link shared: we don't want to link more than one copy of gRPC
+ # into the eventual Cython shared object, otherwise gRPC calls fail
+ # with weird errors due to multiple copies of global static state
+ # (The other solution is to link gRPC shared everywhere instead of
+ # statically only in Flight)
+ set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS}
arrow_flight_shared)
+endif()
+
if(WIN32)
set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS}
${PYTHON_LIBRARIES})
endif()
diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc
new file mode 100644
index 0000000..5550333
--- /dev/null
+++ b/cpp/src/arrow/python/flight.cc
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <utility>
+
+#include "arrow/flight/internal.h"
+#include "arrow/python/flight.h"
+
+namespace arrow {
+namespace py {
+namespace flight {
+
+PyFlightServer::PyFlightServer(PyObject* server, PyFlightServerVtable vtable)
+ : vtable_(vtable) {
+ Py_INCREF(server);
+ server_.reset(server);
+}
+
+Status PyFlightServer::ListFlights(
+ const arrow::flight::Criteria* criteria,
+ std::unique_ptr<arrow::flight::FlightListing>* listings) {
+ return Status::NotImplemented("NYI");
+}
+
+Status PyFlightServer::GetFlightInfo(const arrow::flight::FlightDescriptor&
request,
+
std::unique_ptr<arrow::flight::FlightInfo>* info) {
+ PyAcquireGIL lock;
+ vtable_.get_flight_info(server_.obj(), request, info);
+ return CheckPyError();
+}
+
+Status PyFlightServer::DoGet(const arrow::flight::Ticket& request,
+ std::unique_ptr<arrow::flight::FlightDataStream>*
stream) {
+ PyAcquireGIL lock;
+ vtable_.do_get(server_.obj(), request, stream);
+ return CheckPyError();
+}
+
+Status
PyFlightServer::DoPut(std::unique_ptr<arrow::flight::FlightMessageReader>
reader) {
+ PyAcquireGIL lock;
+ vtable_.do_put(server_.obj(), std::move(reader));
+ return CheckPyError();
+}
+
+Status PyFlightServer::DoAction(const arrow::flight::Action& action,
+ std::unique_ptr<arrow::flight::ResultStream>*
result) {
+ return Status::NotImplemented("NYI");
+}
+
+Status PyFlightServer::ListActions(std::vector<arrow::flight::ActionType>*
actions) {
+ return Status::NotImplemented("NYI");
+}
+
+Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
+ const arrow::flight::FlightDescriptor& descriptor,
+ const std::vector<arrow::flight::FlightEndpoint>&
endpoints,
+ uint64_t total_records, uint64_t total_bytes,
+ std::unique_ptr<arrow::flight::FlightInfo>* out) {
+ arrow::flight::FlightInfo::Data flight_data;
+ RETURN_NOT_OK(arrow::flight::internal::SchemaToString(*schema,
&flight_data.schema));
+ flight_data.descriptor = descriptor;
+ flight_data.endpoints = endpoints;
+ flight_data.total_records = total_records;
+ flight_data.total_bytes = total_bytes;
+ arrow::flight::FlightInfo value(flight_data);
+ *out = std::unique_ptr<arrow::flight::FlightInfo>(new
arrow::flight::FlightInfo(value));
+ return Status::OK();
+}
+
+} // namespace flight
+} // namespace py
+} // namespace arrow
diff --git a/cpp/src/arrow/python/flight.h b/cpp/src/arrow/python/flight.h
new file mode 100644
index 0000000..774cf70
--- /dev/null
+++ b/cpp/src/arrow/python/flight.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PYARROW_FLIGHT_H
+#define PYARROW_FLIGHT_H
+
+#include <memory>
+#include <vector>
+
+#include "arrow/flight/api.h"
+#include "arrow/python/common.h"
+#include "arrow/python/config.h"
+
+namespace arrow {
+
+namespace py {
+
+namespace flight {
+
+/// \brief A table of function pointers for calling from C++ into
+/// Python.
+class ARROW_PYTHON_EXPORT PyFlightServerVtable {
+ public:
+ std::function<void(PyObject*, const arrow::flight::FlightDescriptor&,
+ std::unique_ptr<arrow::flight::FlightInfo>*)>
+ get_flight_info;
+ std::function<void(PyObject*,
std::unique_ptr<arrow::flight::FlightMessageReader>)>
+ do_put;
+ std::function<void(PyObject*, const arrow::flight::Ticket&,
+ std::unique_ptr<arrow::flight::FlightDataStream>*)>
+ do_get;
+};
+
+class ARROW_PYTHON_EXPORT PyFlightServer : public
arrow::flight::FlightServerBase {
+ public:
+ explicit PyFlightServer(PyObject* server, PyFlightServerVtable vtable);
+
+ Status ListFlights(const arrow::flight::Criteria* criteria,
+ std::unique_ptr<arrow::flight::FlightListing>* listings)
override;
+ Status GetFlightInfo(const arrow::flight::FlightDescriptor& request,
+ std::unique_ptr<arrow::flight::FlightInfo>* info)
override;
+ Status DoGet(const arrow::flight::Ticket& request,
+ std::unique_ptr<arrow::flight::FlightDataStream>* stream)
override;
+ Status DoPut(std::unique_ptr<arrow::flight::FlightMessageReader> reader)
override;
+ Status DoAction(const arrow::flight::Action& action,
+ std::unique_ptr<arrow::flight::ResultStream>* result)
override;
+ Status ListActions(std::vector<arrow::flight::ActionType>* actions) override;
+
+ private:
+ OwnedRefNoGIL server_;
+ PyFlightServerVtable vtable_;
+};
+
+ARROW_PYTHON_EXPORT
+Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
+ const arrow::flight::FlightDescriptor& descriptor,
+ const std::vector<arrow::flight::FlightEndpoint>&
endpoints,
+ uint64_t total_records, uint64_t total_bytes,
+ std::unique_ptr<arrow::flight::FlightInfo>* out);
+
+} // namespace flight
+} // namespace py
+} // namespace arrow
+
+#endif // PYARROW_FLIGHT_H
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 0559261..63a8cd0 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -59,6 +59,7 @@ endif()
# Top level cmake dir
if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
+ option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
option(PYARROW_BUILD_GANDIVA "Build the PyArrow Gandiva integration" OFF)
option(PYARROW_BUILD_PARQUET "Build the PyArrow Parquet integration" OFF)
option(PYARROW_PARQUET_USE_SHARED "Rely on parquet shared libraries where
relevant" ON)
@@ -191,6 +192,10 @@ include_directories(SYSTEM ${NUMPY_INCLUDE_DIRS}
${PYTHON_INCLUDE_DIRS} src)
# Dependencies
#
+if(PYARROW_BUILD_FLIGHT)
+ set(ARROW_FLIGHT TRUE)
+endif()
+
# Arrow
find_package(Arrow REQUIRED)
include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
@@ -352,9 +357,15 @@ endif()
if(MSVC)
add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_IMP_LIB})
add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB})
+ if(PYARROW_BUILD_FLIGHT)
+ add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB})
+ endif()
else()
add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_LIB})
add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_LIB})
+ if(PYARROW_BUILD_FLIGHT)
+ add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB})
+ endif()
endif()
#
@@ -474,6 +485,20 @@ if(PYARROW_BUILD_ORC)
set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _orc)
endif()
+# Flight
+if(PYARROW_BUILD_FLIGHT)
+ if(PYARROW_BUNDLE_ARROW_CPP)
+ # TODO:
+ message(FATAL_ERROR "Not yet implemented: bundling arrow-flight in
pyarrow")
+ endif()
+ # We do NOT want to link gRPC or any other Flight dependency
+ # here. Linking more than one copy leads to odd runtime errors due
+ # to multiple copies of static global state. Thus we also need to
+ # link Flight as a shared object.
+ set(LINK_LIBS ${LINK_LIBS} arrow_flight_shared)
+ set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight)
+endif()
+
# Gandiva
if(PYARROW_BUILD_GANDIVA)
find_package(Gandiva)
diff --git a/python/examples/flight/client.py b/python/examples/flight/client.py
new file mode 100644
index 0000000..df9acc1
--- /dev/null
+++ b/python/examples/flight/client.py
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""An example Flight CLI client."""
+
+import argparse
+import sys
+
+import pyarrow
+import pyarrow.flight
+
+
+def list_flights(args, client):
+ print('Flights\n=======')
+ for flight in client.list_flights():
+ descriptor = flight.descriptor
+ if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH:
+ print("Path:", descriptor.path)
+ elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD:
+ print("Command:", descriptor.command)
+ else:
+ print("Unknown descriptor type")
+
+ print("Total records:", end=" ")
+ if flight.total_records >= 0:
+ print(flight.total_records)
+ else:
+ print("Unknown")
+
+ print("Total bytes:", end=" ")
+ if flight.total_bytes >= 0:
+ print(flight.total_bytes)
+ else:
+ print("Unknown")
+
+ print("Number of endpoints:", len(flight.endpoints))
+
+ if args.list:
+ print(flight.schema)
+
+ print('---')
+
+ print('\nActions\n=======')
+ for action in client.list_actions():
+ print("Type:", action.type)
+ print("Description:", action.description)
+ print('---')
+
+
+def do_action(args, client):
+ try:
+ buf = pyarrow.allocate_buffer(0)
+ action = pyarrow.flight.Action(args.action_type, buf)
+ print('Running action', args.action_type)
+ for result in client.do_action(action):
+ print("Got result", result.body.to_pybytes())
+ except pyarrow.lib.ArrowIOError as e:
+ print("Error calling action:", e)
+
+
+def get_flight(args, client):
+ if args.path:
+ descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path)
+ else:
+ descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command)
+
+ info = client.get_flight_info(descriptor)
+ for endpoint in info.endpoints:
+ print('Ticket:', endpoint.ticket)
+ for location in endpoint.locations:
+ print(location)
+ get_client = pyarrow.flight.FlightClient.connect(location)
+ reader = get_client.do_get(endpoint.ticket, info.schema)
+ df = reader.read_pandas()
+ print(df)
+
+
+def _add_common_arguments(parser):
+ parser.add_argument('host', type=str,
+ help="The host to connect to.")
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ subcommands = parser.add_subparsers()
+
+ cmd_list = subcommands.add_parser('list')
+ cmd_list.set_defaults(action='list')
+ _add_common_arguments(cmd_list)
+ cmd_list.add_argument('-l', '--list', action='store_true',
+ help="Print more details.")
+
+ cmd_do = subcommands.add_parser('do')
+ cmd_do.set_defaults(action='do')
+ _add_common_arguments(cmd_do)
+ cmd_do.add_argument('action_type', type=str,
+ help="The action type to run.")
+
+ cmd_get = subcommands.add_parser('get')
+ cmd_get.set_defaults(action='get')
+ _add_common_arguments(cmd_get)
+ cmd_get_descriptor = cmd_get.add_mutually_exclusive_group(required=True)
+ cmd_get_descriptor.add_argument('-p', '--path', type=str, action='append',
+ help="The path for the descriptor.")
+ cmd_get_descriptor.add_argument('-c', '--command', type=str,
+ help="The command for the descriptor.")
+
+ args = parser.parse_args()
+ if not hasattr(args, 'action'):
+ parser.print_help()
+ sys.exit(1)
+
+ commands = {
+ 'list': list_flights,
+ 'do': do_action,
+ 'get': get_flight,
+ }
+ host, port = args.host.split(':')
+ port = int(port)
+ client = pyarrow.flight.FlightClient.connect(host, port)
+ commands[args.action](args, client)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/examples/flight/server.py b/python/examples/flight/server.py
new file mode 100644
index 0000000..cf8668a
--- /dev/null
+++ b/python/examples/flight/server.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""An example Flight Python server."""
+
+import pyarrow
+import pyarrow.flight
+
+
+class FlightServer(pyarrow.flight.FlightServerBase):
+ def __init__(self):
+ super(FlightServer, self).__init__()
+ self.flights = {}
+
+ @classmethod
+ def descriptor_to_key(self, descriptor):
+ return (descriptor.descriptor_type, descriptor.command,
+ tuple(descriptor.path or tuple()))
+
+ def get_flight_info(self, descriptor):
+ key = FlightServer.descriptor_to_key(descriptor)
+ if key in self.flights:
+ table = self.flights[key]
+ return pyarrow.flight.FlightInfo(table.schema,
+ descriptor, [],
+ table.num_rows, 0)
+ raise KeyError('Flight not found.')
+
+ def do_put(self, descriptor, reader):
+ key = FlightServer.descriptor_to_key(descriptor)
+ print(key)
+ self.flights[key] = reader.read_all()
+ print(self.flights[key])
+
+
+def main():
+ server = FlightServer()
+ server.run(5005)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
new file mode 100644
index 0000000..4819171
--- /dev/null
+++ b/python/pyarrow/_flight.pyx
@@ -0,0 +1,476 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import collections
+import enum
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.compat import frombytes, tobytes
+from pyarrow.lib cimport *
+from pyarrow.lib import as_buffer
+from pyarrow.includes.libarrow_flight cimport *
+from pyarrow.ipc import _ReadPandasOption
+import pyarrow.lib as lib
+
+
+cdef class Action:
+ """An action executable on a Flight service."""
+ cdef:
+ CAction action
+
+ def __init__(self, action_type, buf):
+ self.action.type = tobytes(action_type)
+ self.action.body = pyarrow_unwrap_buffer(as_buffer(buf))
+
+ @property
+ def type(self):
+ return frombytes(self.action.type)
+
+ def body(self):
+ return pyarrow_wrap_buffer(self.action.body)
+
+
+cdef class ActionType:
+ """A type of action executable on a Flight service."""
+ cdef:
+ CActionType action_type
+
+ @property
+ def type(self):
+ return frombytes(self.action_type.type)
+
+ @property
+ def description(self):
+ return frombytes(self.action_type.description)
+
+ def make_action(self, buf):
+ """Create an Action with this type."""
+ return Action(self.type, buf)
+
+ def __repr__(self):
+ return '<ActionType type={} description={}>'.format(
+ self.type, self.description)
+
+
+cdef class Result:
+ """A result from executing an Action."""
+ cdef:
+ unique_ptr[CResult] result
+
+ @property
+ def body(self):
+ """Get the Buffer containing the result."""
+ return pyarrow_wrap_buffer(self.result.get().body)
+
+
+class DescriptorType(enum.Enum):
+ UNKNOWN = 0
+ PATH = 1
+ CMD = 2
+
+
+cdef class FlightDescriptor:
+ """A description of a data stream available from a Flight service."""
+ cdef:
+ CFlightDescriptor descriptor
+
+ def __init__(self):
+ raise TypeError("Do not call {}'s constructor directly, use "
+ "`pyarrow.flight.FlightDescriptor.for_{path,command}` "
+ "function instead."
+ .format(self.__class__.__name__))
+
+ @staticmethod
+ def for_path(*path):
+ """Create a FlightDescriptor for a resource path."""
+ cdef FlightDescriptor result = \
+ FlightDescriptor.__new__(FlightDescriptor)
+ result.descriptor.type = CDescriptorTypePath
+ result.descriptor.path = [tobytes(p) for p in path]
+ return result
+
+ @staticmethod
+ def for_command(command):
+ """Create a FlightDescriptor for an opaque command."""
+ cdef FlightDescriptor result = \
+ FlightDescriptor.__new__(FlightDescriptor)
+ result.descriptor.type = CDescriptorTypeCmd
+ result.descriptor.cmd = tobytes(command)
+ return result
+
+ @property
+ def descriptor_type(self):
+ if self.descriptor.type == CDescriptorTypeUnknown:
+ return DescriptorType.UNKNOWN
+ elif self.descriptor.type == CDescriptorTypePath:
+ return DescriptorType.PATH
+ elif self.descriptor.type == CDescriptorTypeCmd:
+ return DescriptorType.CMD
+ raise RuntimeError("Invalid descriptor type!")
+
+ @property
+ def command(self):
+ """Get the command for this descriptor."""
+ if self.descriptor_type != DescriptorType.CMD:
+ return None
+ return self.descriptor.cmd
+
+ @property
+ def path(self):
+ """Get the path for this descriptor."""
+ if self.descriptor_type != DescriptorType.PATH:
+ return None
+ return self.descriptor.path
+
+ def __repr__(self):
+ return "<FlightDescriptor type: {!r}>".format(self.descriptor_type())
+
+
+class Ticket:
+ """A ticket for requesting a Flight stream."""
+ def __init__(self, ticket):
+ self.ticket = ticket
+
+ def __repr__(self):
+ return '<Ticket {}>'.format(self.ticket)
+
+
+class Location(collections.namedtuple('Location', ['host', 'port'])):
+ """A location where a Flight stream is available."""
+
+
+cdef class FlightEndpoint:
+ """A Flight stream, along with the ticket and locations to access it."""
+ cdef:
+ CFlightEndpoint endpoint
+
+ def __init__(self, ticket, locations):
+ """Create a FlightEndpoint from a ticket and list of locations.
+
+ Parameters
+ ----------
+ ticket : Ticket or bytes
+ the ticket needed to access this flight
+ locations : list of Location or tuples of (host, port)
+ locations where this flight is available
+ """
+ cdef:
+ CLocation c_location = CLocation()
+
+ if isinstance(ticket, Ticket):
+ self.endpoint.ticket.ticket = ticket.ticket
+ else:
+ self.endpoint.ticket.ticket = ticket
+
+ for location in locations:
+ # Accepts Location namedtuple or tuple
+ c_location.host = tobytes(location[0])
+ c_location.port = location[1]
+ self.endpoint.locations.push_back(c_location)
+
+ @property
+ def ticket(self):
+ return Ticket(self.endpoint.ticket.ticket)
+
+ @property
+ def locations(self):
+ return [Location(frombytes(location.host), location.port)
+ for location in self.endpoint.locations]
+
+
+cdef class FlightInfo:
+ """A description of a Flight stream."""
+ cdef:
+ unique_ptr[CFlightInfo] info
+
+ def __init__(self, Schema schema, FlightDescriptor descriptor, endpoints,
+ total_records, total_bytes):
+ """Create a FlightInfo object from a schema, descriptor, and endpoints.
+
+ Parameters
+ ----------
+ schema : Schema
+ the schema of the data in this flight.
+ descriptor : FlightDescriptor
+ the descriptor for this flight.
+ endpoints : list of FlightEndpoint
+ a list of endpoints where this flight is available.
+ total_records : int
+ the total records in this flight, or -1 if unknown
+ total_bytes : int
+ the total bytes in this flight, or -1 if unknown
+ """
+ cdef:
+ shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+ vector[CFlightEndpoint] c_endpoints
+
+ for endpoint in endpoints:
+ if isinstance(endpoint, FlightEndpoint):
+ c_endpoints.push_back((<FlightEndpoint> endpoint).endpoint)
+ else:
+ raise TypeError('Endpoint {} is not instance of'
+ ' FlightEndpoint'.format(endpoint))
+
+ check_status(CreateFlightInfo(c_schema,
+ descriptor.descriptor,
+ c_endpoints,
+ total_records,
+ total_bytes, &self.info))
+
+ @property
+ def total_records(self):
+ """The total record count of this flight, or -1 if unknown."""
+ return self.info.get().total_records()
+
+ @property
+ def total_bytes(self):
+ """The size in bytes of the data in this flight, or -1 if unknown."""
+ return self.info.get().total_bytes()
+
+ @property
+ def schema(self):
+ """The schema of the data in this flight."""
+ cdef:
+ shared_ptr[CSchema] schema
+ check_status(self.info.get().GetSchema(&schema))
+ return pyarrow_wrap_schema(schema)
+
+ @property
+ def descriptor(self):
+ """The descriptor of the data in this flight."""
+ cdef FlightDescriptor result = \
+ FlightDescriptor.__new__(FlightDescriptor)
+ result.descriptor = self.info.get().descriptor()
+ return result
+
+ @property
+ def endpoints(self):
+ """The endpoints where this flight is available."""
+ # TODO: get Cython to iterate over reference directly
+ cdef:
+ vector[CFlightEndpoint] endpoints = self.info.get().endpoints()
+ FlightEndpoint py_endpoint
+
+ result = []
+ for endpoint in endpoints:
+ py_endpoint = FlightEndpoint.__new__()
+ py_endpoint.endpoint = endpoint
+ result.append(py_endpoint)
+ return result
+
+
+cdef class FlightRecordBatchReader(_CRecordBatchReader, _ReadPandasOption):
+ cdef dict __dict__
+
+
+cdef class FlightRecordBatchWriter(_CRecordBatchWriter):
+ pass
+
+
+cdef class FlightClient:
+ """A client to a Flight service."""
+ cdef:
+ unique_ptr[CFlightClient] client
+
+ def __init__(self):
+ raise TypeError("Do not call {}'s constructor directly, use "
+ "`pyarrow.flight.FlightClient.connect` instead."
+ .format(self.__class__.__name__))
+
+ @staticmethod
+ def connect(*args):
+ """Connect to a Flight service on the given host and port."""
+ cdef:
+ FlightClient result = FlightClient.__new__(FlightClient)
+ int c_port = 0
+ c_string c_host
+
+ if len(args) == 1:
+ # Accept namedtuple or plain tuple
+ c_host = tobytes(args[0][0])
+ c_port = args[0][1]
+ elif len(args) == 2:
+ # Accept separate host, port
+ c_host = tobytes(args[0])
+ c_port = args[1]
+ else:
+ raise TypeError("FlightClient.connect() takes 1 "
+ "or 2 arguments ({} given)".format(len(args)))
+
+ with nogil:
+ check_status(CFlightClient.Connect(c_host, c_port, &result.client))
+
+ return result
+
+ def list_actions(self):
+ """List the actions available on a service."""
+ cdef:
+ vector[CActionType] results
+
+ with nogil:
+ check_status(self.client.get().ListActions(&results))
+
+ result = []
+ for action_type in results:
+ py_action = ActionType()
+ py_action.action_type = action_type
+ result.append(py_action)
+
+ return result
+
+ def do_action(self, action: Action):
+ """Execute an action on a service."""
+ cdef:
+ unique_ptr[CResultStream] results
+ with nogil:
+ check_status(self.client.get().DoAction(action.action, &results))
+
+ while True:
+ result = Result()
+ with nogil:
+ check_status(results.get().Next(&result.result))
+ if result.result == NULL:
+ break
+ yield result
+
+ def list_flights(self):
+ """List the flights available on a service."""
+ cdef:
+ unique_ptr[CFlightListing] listing
+ FlightInfo result
+
+ with nogil:
+ check_status(self.client.get().ListFlights(&listing))
+
+ while True:
+ result = FlightInfo.__new__(FlightInfo)
+ with nogil:
+ check_status(listing.get().Next(&result.info))
+ if result.info == NULL:
+ break
+ yield result
+
+ def get_flight_info(self, descriptor: FlightDescriptor):
+ """Request information about an available flight."""
+ cdef:
+ FlightInfo result = FlightInfo.__new__(FlightInfo)
+
+ with nogil:
+ check_status(self.client.get().GetFlightInfo(
+ descriptor.descriptor, &result.info))
+
+ return result
+
+ def do_get(self, ticket: Ticket, schema: Schema):
+ """Request the data for a flight."""
+ cdef:
+ # TODO: introduce unwrap
+ CTicket c_ticket
+ shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+ unique_ptr[CRecordBatchReader] reader
+
+ c_ticket.ticket = ticket.ticket
+ with nogil:
+ check_status(self.client.get().DoGet(c_ticket, c_schema, &reader))
+ result = FlightRecordBatchReader()
+ result.reader.reset(reader.release())
+ return result
+
+ def do_put(self, descriptor: FlightDescriptor, schema: Schema):
+ """Upload data to a flight."""
+ cdef:
+ shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+ unique_ptr[CRecordBatchWriter] writer
+
+ with nogil:
+ check_status(self.client.get().DoPut(
+ descriptor.descriptor, c_schema, &writer))
+ result = FlightRecordBatchWriter()
+ result.writer.reset(writer.release())
+ return result
+
+
+cdef class FlightDataStream:
+ cdef:
+ unique_ptr[CFlightDataStream] stream
+
+
+cdef class RecordBatchStream(FlightDataStream):
+ def __init__(self, reader):
+ # TODO: we don't really expose the readers in Python.
+ pass
+
+
+cdef void _get_flight_info(void* self, CFlightDescriptor c_descriptor,
+ unique_ptr[CFlightInfo]* info):
+ """Callback for implementing Flight servers in Python."""
+ raise NotImplementedError("GetFlightInfo is not implemented")
+
+
+cdef void _do_put(void* self, unique_ptr[CFlightMessageReader] reader):
+ """Callback for implementing Flight servers in Python."""
+ cdef:
+ FlightRecordBatchReader py_reader = FlightRecordBatchReader()
+ FlightDescriptor descriptor = \
+ FlightDescriptor.__new__(FlightDescriptor)
+
+ descriptor.descriptor = reader.get().descriptor()
+ py_reader.reader.reset(reader.release())
+ (<object> self).do_put(descriptor, py_reader)
+
+
+cdef void _do_get(void* self, CTicket ticket,
+ unique_ptr[CFlightDataStream]* stream):
+ """Callback for implementing Flight servers in Python."""
+ py_ticket = Ticket()
+ py_ticket.ticket = ticket.ticket
+ result = (<object> self).do_get(py_ticket)
+ if not isinstance(result, FlightDataStream):
+ raise TypeError("FlightServerBase.do_get must return "
+ "a FlightDataStream")
+ stream[0] = move((<FlightDataStream> result).stream)
+
+
+cdef class FlightServerBase:
+ """A Flight service definition."""
+
+ cdef:
+ unique_ptr[PyFlightServer] server
+
+ def run(self, port):
+ cdef:
+ PyFlightServerVtable vtable = PyFlightServerVtable()
+ int c_port = port
+ vtable.get_flight_info = &_get_flight_info
+ vtable.do_put = &_do_put
+ vtable.do_get = &_do_get
+ self.server.reset(new PyFlightServer(self, vtable))
+ with nogil:
+ self.server.get().Run(c_port)
+
+ def get_flight_info(self, descriptor):
+ raise NotImplementedError
+
+ def do_put(self, descriptor, reader):
+ raise NotImplementedError
+
+ def shutdown(self):
+ if self.server.get() != NULL:
+ self.server.get().Shutdown()
diff --git a/python/pyarrow/flight.py b/python/pyarrow/flight.py
new file mode 100644
index 0000000..d238c37
--- /dev/null
+++ b/python/pyarrow/flight.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow._flight import (FlightClient, Action, ActionType, # noqa
+ FlightDescriptor, FlightInfo, Ticket, Location,
+ FlightServerBase, DescriptorType)
diff --git a/python/pyarrow/includes/common.pxd
b/python/pyarrow/includes/common.pxd
index 1b13ff0..97e23f9 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -41,8 +41,12 @@ cdef extern from "numpy/halffloat.h":
cdef extern from "arrow/api.h" namespace "arrow" nogil:
# We can later add more of the common status factory methods as needed
- cdef CStatus CStatus_OK "Status::OK"()
- cdef CStatus CStatus_Invalid "Status::Invalid"()
+ cdef CStatus CStatus_OK "arrow::Status::OK"()
+ cdef CStatus CStatus_Invalid "arrow::Status::Invalid"()
+ cdef CStatus CStatus_NotImplemented \
+ "arrow::Status::NotImplemented"(const c_string& msg)
+ cdef CStatus CStatus_UnknownError \
+ "arrow::Status::UnknownError"(const c_string& msg)
cdef cppclass CStatus "arrow::Status":
CStatus()
diff --git a/python/pyarrow/includes/libarrow_flight.pxd
b/python/pyarrow/includes/libarrow_flight.pxd
new file mode 100644
index 0000000..4d64f52
--- /dev/null
+++ b/python/pyarrow/includes/libarrow_flight.pxd
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from libcpp.functional cimport function
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
+ cdef cppclass CActionType" arrow::flight::ActionType":
+ c_string type
+ c_string description
+
+ cdef cppclass CAction" arrow::flight::Action":
+ c_string type
+ shared_ptr[CBuffer] body
+
+ cdef cppclass CResult" arrow::flight::Result":
+ shared_ptr[CBuffer] body
+
+ cdef cppclass CResultStream" arrow::flight::ResultStream":
+ CStatus Next(unique_ptr[CResult]* result)
+
+ cdef cppclass CDescriptorType \
+ " arrow::flight::FlightDescriptor::DescriptorType":
+ bint operator==(CDescriptorType)
+
+ CDescriptorType CDescriptorTypeUnknown\
+ " arrow::flight::FlightDescriptor::UNKNOWN"
+ CDescriptorType CDescriptorTypePath\
+ " arrow::flight::FlightDescriptor::PATH"
+ CDescriptorType CDescriptorTypeCmd\
+ " arrow::flight::FlightDescriptor::CMD"
+
+ cdef cppclass CFlightDescriptor" arrow::flight::FlightDescriptor":
+ CDescriptorType type
+ c_string cmd
+ vector[c_string] path
+
+ cdef cppclass CTicket" arrow::flight::Ticket":
+ CTicket()
+ c_string ticket
+
+ cdef cppclass CLocation" arrow::flight::Location":
+ CLocation()
+
+ c_string host
+ int32_t port
+
+ cdef cppclass CFlightEndpoint" arrow::flight::FlightEndpoint":
+ CFlightEndpoint()
+
+ CTicket ticket
+ vector[CLocation] locations
+
+ cdef cppclass CFlightInfo" arrow::flight::FlightInfo":
+ uint64_t total_records()
+ uint64_t total_bytes()
+ CStatus GetSchema(shared_ptr[CSchema]* out)
+ CFlightDescriptor& descriptor()
+ const vector[CFlightEndpoint]& endpoints()
+
+ cdef cppclass CFlightListing" arrow::flight::FlightListing":
+ CStatus Next(unique_ptr[CFlightInfo]* info)
+
+ cdef cppclass CFlightMessageReader \
+ " arrow::flight::FlightMessageReader"(CRecordBatchReader):
+ CFlightDescriptor& descriptor()
+
+ cdef cppclass CFlightDataStream" arrow::flight::FlightDataStream":
+ pass
+
+ cdef cppclass CRecordBatchStream \
+ " arrow::flight::RecordBatchStream"(CFlightDataStream):
+ CRecordBatchStream(shared_ptr[CRecordBatchReader]& reader)
+
+ cdef cppclass CFlightClient" arrow::flight::FlightClient":
+ @staticmethod
+ CStatus Connect(const c_string& host, int port,
+ unique_ptr[CFlightClient]* client)
+
+ CStatus DoAction(CAction& action, unique_ptr[CResultStream]* results)
+ CStatus ListActions(vector[CActionType]* actions)
+
+ CStatus ListFlights(unique_ptr[CFlightListing]* listing)
+ CStatus GetFlightInfo(CFlightDescriptor& descriptor,
+ unique_ptr[CFlightInfo]* info)
+
+ CStatus DoGet(CTicket& ticket, shared_ptr[CSchema]& schema,
+ unique_ptr[CRecordBatchReader]* stream)
+ CStatus DoPut(CFlightDescriptor& descriptor,
+ shared_ptr[CSchema]& schema,
+ unique_ptr[CRecordBatchWriter]* stream)
+
+
+# Callbacks for implementing Flight servers
+# Use typedef to emulate syntax for std::function<void(...)>
+ctypedef void cb_get_flight_info(object, const CFlightDescriptor&,
+ unique_ptr[CFlightInfo]*)
+ctypedef void cb_do_put(object, unique_ptr[CFlightMessageReader])
+ctypedef void cb_do_get(object, const CTicket&,
+ unique_ptr[CFlightDataStream]*)
+
+cdef extern from "arrow/python/flight.h" namespace "arrow::py::flight" nogil:
+ cdef cppclass PyFlightServerVtable:
+ PyFlightServerVtable()
+ function[cb_get_flight_info] get_flight_info
+ function[cb_do_put] do_put
+ function[cb_do_get] do_get
+
+ cdef cppclass PyFlightServer:
+ PyFlightServer(object server, PyFlightServerVtable vtable)
+ void Run(int port)
+ void Shutdown()
+
+ cdef CStatus CreateFlightInfo" arrow::py::flight::CreateFlightInfo"(
+ shared_ptr[CSchema] schema,
+ CFlightDescriptor& descriptor,
+ vector[CFlightEndpoint] endpoints,
+ uint64_t total_records,
+ uint64_t total_bytes,
+ unique_ptr[CFlightInfo]* out)
+
+cdef extern from "<utility>" namespace "std":
+ unique_ptr[CFlightDataStream] move(unique_ptr[CFlightDataStream])
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 137d526..e857302 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -148,26 +148,14 @@ cdef class MessageReader:
# ----------------------------------------------------------------------
# File and stream readers and writers
-cdef class _RecordBatchWriter:
- cdef:
- shared_ptr[CRecordBatchWriter] writer
- shared_ptr[OutputStream] sink
- bint closed
-
- def __cinit__(self):
- pass
-
- def __dealloc__(self):
- pass
+cdef class _CRecordBatchWriter:
+ """The base RecordBatchWriter wrapper.
- def _open(self, sink, Schema schema):
- get_writer(sink, &self.sink)
+ Provides common implementations of convenience methods. Should not
+ be instantiated directly by user code.
+ """
- with nogil:
- check_status(
- CRecordBatchStreamWriter.Open(self.sink.get(),
- schema.sp_schema,
- &self.writer))
+ # cdef block is in lib.pxd
def write(self, table_or_batch):
"""
@@ -222,6 +210,33 @@ cdef class _RecordBatchWriter:
with nogil:
check_status(self.writer.get().Close())
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+
+cdef class _RecordBatchStreamWriter(_CRecordBatchWriter):
+ cdef:
+ shared_ptr[OutputStream] sink
+ bint closed
+
+ def __cinit__(self):
+ pass
+
+ def __dealloc__(self):
+ pass
+
+ def _open(self, sink, Schema schema):
+ get_writer(sink, &self.sink)
+
+ with nogil:
+ check_status(
+ CRecordBatchStreamWriter.Open(self.sink.get(),
+ schema.sp_schema,
+ &self.writer))
+
cdef _get_input_stream(object source, shared_ptr[InputStream]* out):
cdef:
@@ -237,24 +252,14 @@ cdef _get_input_stream(object source,
shared_ptr[InputStream]* out):
out[0] = <shared_ptr[InputStream]> file_handle
-cdef class _RecordBatchReader:
- cdef:
- shared_ptr[CRecordBatchReader] reader
- shared_ptr[InputStream] in_stream
-
- cdef readonly:
- Schema schema
-
- def __cinit__(self):
- pass
+cdef class _CRecordBatchReader:
+ """The base RecordBatchReader wrapper.
- def _open(self, source):
- _get_input_stream(source, &self.in_stream)
- with nogil:
- check_status(CRecordBatchStreamReader.Open(
- self.in_stream.get(), &self.reader))
+ Provides common implementations of convenience methods. Should not
+ be instantiated directly by user code.
+ """
- self.schema = pyarrow_wrap_schema(self.reader.get().schema())
+ # cdef block is in lib.pxd
def __iter__(self):
while True:
@@ -291,7 +296,26 @@ cdef class _RecordBatchReader:
return pyarrow_wrap_table(table)
-cdef class _RecordBatchFileWriter(_RecordBatchWriter):
+cdef class _RecordBatchStreamReader(_CRecordBatchReader):
+ cdef:
+ shared_ptr[InputStream] in_stream
+
+ cdef readonly:
+ Schema schema
+
+ def __cinit__(self):
+ pass
+
+ def _open(self, source):
+ _get_input_stream(source, &self.in_stream)
+ with nogil:
+ check_status(CRecordBatchStreamReader.Open(
+ self.in_stream.get(), &self.reader))
+
+ self.schema = pyarrow_wrap_schema(self.reader.get().schema())
+
+
+cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
def _open(self, sink, Schema schema):
get_writer(sink, &self.sink)
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index a79cafe..78bb347 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -45,7 +45,7 @@ class _ReadPandasOption(object):
return table.to_pandas(**options)
-class RecordBatchStreamReader(lib._RecordBatchReader, _ReadPandasOption):
+class RecordBatchStreamReader(lib._RecordBatchStreamReader, _ReadPandasOption):
"""
Reader for the Arrow streaming binary format
@@ -58,7 +58,7 @@ class RecordBatchStreamReader(lib._RecordBatchReader,
_ReadPandasOption):
self._open(source)
-class RecordBatchStreamWriter(lib._RecordBatchWriter):
+class RecordBatchStreamWriter(lib._RecordBatchStreamWriter):
"""
Writer for the Arrow streaming binary format
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 8cd8f40..6f14e76 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -383,6 +383,16 @@ cdef class NativeFile:
cdef shared_ptr[OutputStream] get_output_stream(self) except *
+cdef class _CRecordBatchWriter:
+ cdef:
+ shared_ptr[CRecordBatchWriter] writer
+
+
+cdef class _CRecordBatchReader:
+ cdef:
+ shared_ptr[CRecordBatchReader] reader
+
+
cdef get_input_stream(object source, c_bool use_memory_map,
shared_ptr[InputStream]* reader)
cdef get_reader(object source, c_bool use_memory_map,
diff --git a/python/requirements.txt b/python/requirements.txt
index 3a23d1d..ba67f6b 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,3 +1,4 @@
six>=1.0.0
numpy>=1.14
futures; python_version < "3.2"
+enum34 >= 1.1.6; python_version < "3.4"
diff --git a/python/setup.py b/python/setup.py
index e4a7936..0fc89c0 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -99,6 +99,7 @@ class build_ext(_build_ext):
('boost-namespace=', None,
'namespace of boost (default: boost)'),
('with-cuda', None, 'build the Cuda extension'),
+ ('with-flight', None, 'build the Flight extension'),
('with-parquet', None, 'build the Parquet extension'),
('with-static-parquet', None, 'link parquet statically'),
('with-static-boost', None, 'link boost statically'),
@@ -136,6 +137,8 @@ class build_ext(_build_ext):
self.with_cuda = strtobool(
os.environ.get('PYARROW_WITH_CUDA', '0'))
+ self.with_flight = strtobool(
+ os.environ.get('PYARROW_WITH_FLIGHT', '0'))
self.with_parquet = strtobool(
os.environ.get('PYARROW_WITH_PARQUET', '0'))
self.with_static_parquet = strtobool(
@@ -161,6 +164,7 @@ class build_ext(_build_ext):
'lib',
'_csv',
'_cuda',
+ '_flight',
'_parquet',
'_orc',
'_plasma',
@@ -200,6 +204,8 @@ class build_ext(_build_ext):
cmake_options += ['-G', self.cmake_generator]
if self.with_cuda:
cmake_options.append('-DPYARROW_BUILD_CUDA=on')
+ if self.with_flight:
+ cmake_options.append('-DPYARROW_BUILD_FLIGHT=on')
if self.with_parquet:
cmake_options.append('-DPYARROW_BUILD_PARQUET=on')
if self.with_static_parquet:
@@ -344,6 +350,8 @@ class build_ext(_build_ext):
move_shared_libs(build_prefix, build_lib, "arrow_python")
if self.with_cuda:
move_shared_libs(build_prefix, build_lib, "arrow_gpu")
+ if self.with_flight:
+ move_shared_libs(build_prefix, build_lib, "arrow_flight")
if self.with_plasma:
move_shared_libs(build_prefix, build_lib, "plasma")
if self.with_gandiva:
@@ -384,6 +392,8 @@ class build_ext(_build_ext):
return True
if name == '_orc' and not self.with_orc:
return True
+ if name == '_flight' and not self.with_flight:
+ return True
if name == '_cuda' and not self.with_cuda:
return True
if name == 'gandiva' and not self.with_gandiva:
@@ -530,7 +540,8 @@ class BinaryDistribution(Distribution):
install_requires = (
'numpy >= 1.14',
'six >= 1.0.0',
- 'futures; python_version < "3.2"'
+ 'futures; python_version < "3.2"',
+ 'enum34 >= 1.1.6; python_version < "3.4"',
)