davisusanibar commented on a change in pull request #137:
URL: https://github.com/apache/arrow-cookbook/pull/137#discussion_r800783552
##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the
InMemoryStore.
+
+Creating the Server
+*******************
+
+.. testcode::
+
+ import org.apache.arrow.flight.Action;
+ import org.apache.arrow.flight.ActionType;
+ import org.apache.arrow.flight.CallStatus;
+ import org.apache.arrow.flight.Criteria;
+ import org.apache.arrow.flight.FlightDescriptor;
+ import org.apache.arrow.flight.FlightInfo;
+ import org.apache.arrow.flight.FlightProducer;
+ import org.apache.arrow.flight.FlightStream;
+ import org.apache.arrow.flight.Location;
+ import org.apache.arrow.flight.PutResult;
+ import org.apache.arrow.flight.Result;
+ import org.apache.arrow.flight.Ticket;
+ import org.apache.arrow.flight.example.ExampleTicket;
+ import org.apache.arrow.flight.example.FlightHolder;
+ import org.apache.arrow.flight.example.Stream;
+ import org.apache.arrow.flight.example.Stream.StreamCreator;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.util.AutoCloseables;
+ import org.apache.arrow.vector.VectorSchemaRoot;
+ import org.apache.arrow.vector.VectorUnloader;
+
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+
+ // InMemoryStore
+
+ public class InMemoryStore implements FlightProducer, AutoCloseable {
+ private final ConcurrentMap<FlightDescriptor, FlightHolder> holders =
new ConcurrentHashMap<>();
+ private final BufferAllocator allocator;
+ private Location location;
+
+ public InMemoryStore(BufferAllocator allocator, Location location) {
+ super();
+ this.allocator = allocator;
+ this.location = location;
+ }
+
+ public void setLocation(Location location) {
+ this.location = location;
+ }
+
+ @Override
+ public void getStream(CallContext context, Ticket ticket,
+ FlightProducer.ServerStreamListener listener) {
+ System.out.println("Calling to getStream");
+ getStream(ticket).sendTo(allocator, listener);
+ }
+
+ public Stream getStream(Ticket t) {
+ ExampleTicket example = ExampleTicket.from(t);
+ FlightDescriptor d = FlightDescriptor.path(example.getPath());
+ FlightHolder h = holders.get(d);
+ if (h == null) {
+ throw new IllegalStateException("Unknown ticket.");
+ }
+
+ return h.getStream(example);
+ }
+
+ @Override
+ public void listFlights(CallContext context, Criteria criteria,
StreamListener<FlightInfo> listener) {
+ System.out.println("Calling to listFligths");
Review comment:
Deleted
##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the
InMemoryStore.
+
+Creating the Server
+*******************
+
+.. testcode::
+
+ import org.apache.arrow.flight.Action;
+ import org.apache.arrow.flight.ActionType;
+ import org.apache.arrow.flight.CallStatus;
+ import org.apache.arrow.flight.Criteria;
+ import org.apache.arrow.flight.FlightDescriptor;
+ import org.apache.arrow.flight.FlightInfo;
+ import org.apache.arrow.flight.FlightProducer;
+ import org.apache.arrow.flight.FlightStream;
+ import org.apache.arrow.flight.Location;
+ import org.apache.arrow.flight.PutResult;
+ import org.apache.arrow.flight.Result;
+ import org.apache.arrow.flight.Ticket;
+ import org.apache.arrow.flight.example.ExampleTicket;
+ import org.apache.arrow.flight.example.FlightHolder;
+ import org.apache.arrow.flight.example.Stream;
+ import org.apache.arrow.flight.example.Stream.StreamCreator;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.util.AutoCloseables;
+ import org.apache.arrow.vector.VectorSchemaRoot;
+ import org.apache.arrow.vector.VectorUnloader;
+
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+
+ // InMemoryStore
+
+ public class InMemoryStore implements FlightProducer, AutoCloseable {
+ private final ConcurrentMap<FlightDescriptor, FlightHolder> holders =
new ConcurrentHashMap<>();
+ private final BufferAllocator allocator;
+ private Location location;
+
+ public InMemoryStore(BufferAllocator allocator, Location location) {
+ super();
+ this.allocator = allocator;
+ this.location = location;
+ }
+
+ public void setLocation(Location location) {
+ this.location = location;
+ }
+
+ @Override
+ public void getStream(CallContext context, Ticket ticket,
+ FlightProducer.ServerStreamListener listener) {
+ System.out.println("Calling to getStream");
+ getStream(ticket).sendTo(allocator, listener);
+ }
+
+ public Stream getStream(Ticket t) {
+ ExampleTicket example = ExampleTicket.from(t);
+ FlightDescriptor d = FlightDescriptor.path(example.getPath());
+ FlightHolder h = holders.get(d);
+ if (h == null) {
+ throw new IllegalStateException("Unknown ticket.");
+ }
+
+ return h.getStream(example);
+ }
+
+ @Override
+ public void listFlights(CallContext context, Criteria criteria,
StreamListener<FlightInfo> listener) {
+ System.out.println("Calling to listFligths");
+ try {
+ for (FlightHolder h : holders.values()) {
+ listener.onNext(h.getFlightInfo(location));
+ }
+ listener.onCompleted();
+ } catch (Exception ex) {
+ listener.onError(ex);
+ }
+ }
+
+ @Override
+ public FlightInfo getFlightInfo(CallContext context, FlightDescriptor
descriptor) {
+ System.out.println("Calling to getFlightInfo");
+ FlightHolder h = holders.get(descriptor);
+ if (h == null) {
+ throw new IllegalStateException("Unknown descriptor.");
+ }
+
+ return h.getFlightInfo(location);
+ }
+
+ @Override
+ public Runnable acceptPut(CallContext context,
+ final FlightStream flightStream, final
StreamListener<PutResult> ackStream) {
+ return () -> {
+ System.out.println("Calling to acceptPut");
+ StreamCreator creator = null;
+ boolean success = false;
+ try (VectorSchemaRoot root = flightStream.getRoot()) {
+ final FlightHolder h = holders.computeIfAbsent(
+ flightStream.getDescriptor(),
+ t -> new FlightHolder(allocator, t,
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+ creator = h.addStream(flightStream.getSchema());
+
+ VectorUnloader unloader = new VectorUnloader(root);
+ while (flightStream.next()) {
+
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
Review comment:
Deleted
##########
File path: java/source/flight.rst
##########
@@ -0,0 +1,1699 @@
+.. _arrow-flight:
+
+============
+Arrow Flight
+============
+
+Recipes related to leveraging Arrow Flight protocol
+
+.. contents::
+
+Simple Service with Arrow Flight
+================================
+
+We are going to create: Flight Producer and Fligh Server:
+
+* InMemoryStore: A FlightProducer that hosts an in memory store of Arrow
buffers. Used for integration testing.
+
+* ExampleFlightServer: An Example Flight Server that provides access to the
InMemoryStore.
+
+Creating the Server
+*******************
+
+.. testcode::
+
+ import org.apache.arrow.flight.Action;
+ import org.apache.arrow.flight.ActionType;
+ import org.apache.arrow.flight.CallStatus;
+ import org.apache.arrow.flight.Criteria;
+ import org.apache.arrow.flight.FlightDescriptor;
+ import org.apache.arrow.flight.FlightInfo;
+ import org.apache.arrow.flight.FlightProducer;
+ import org.apache.arrow.flight.FlightStream;
+ import org.apache.arrow.flight.Location;
+ import org.apache.arrow.flight.PutResult;
+ import org.apache.arrow.flight.Result;
+ import org.apache.arrow.flight.Ticket;
+ import org.apache.arrow.flight.example.ExampleTicket;
+ import org.apache.arrow.flight.example.FlightHolder;
+ import org.apache.arrow.flight.example.Stream;
+ import org.apache.arrow.flight.example.Stream.StreamCreator;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.util.AutoCloseables;
+ import org.apache.arrow.vector.VectorSchemaRoot;
+ import org.apache.arrow.vector.VectorUnloader;
+
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+
+ // InMemoryStore
+
+ public class InMemoryStore implements FlightProducer, AutoCloseable {
+ private final ConcurrentMap<FlightDescriptor, FlightHolder> holders =
new ConcurrentHashMap<>();
+ private final BufferAllocator allocator;
+ private Location location;
+
+ public InMemoryStore(BufferAllocator allocator, Location location) {
+ super();
+ this.allocator = allocator;
+ this.location = location;
+ }
+
+ public void setLocation(Location location) {
+ this.location = location;
+ }
+
+ @Override
+ public void getStream(CallContext context, Ticket ticket,
+ FlightProducer.ServerStreamListener listener) {
+ System.out.println("Calling to getStream");
+ getStream(ticket).sendTo(allocator, listener);
+ }
+
+ public Stream getStream(Ticket t) {
+ ExampleTicket example = ExampleTicket.from(t);
+ FlightDescriptor d = FlightDescriptor.path(example.getPath());
+ FlightHolder h = holders.get(d);
+ if (h == null) {
+ throw new IllegalStateException("Unknown ticket.");
+ }
+
+ return h.getStream(example);
+ }
+
+ @Override
+ public void listFlights(CallContext context, Criteria criteria,
StreamListener<FlightInfo> listener) {
+ System.out.println("Calling to listFligths");
+ try {
+ for (FlightHolder h : holders.values()) {
+ listener.onNext(h.getFlightInfo(location));
+ }
+ listener.onCompleted();
+ } catch (Exception ex) {
+ listener.onError(ex);
+ }
+ }
+
+ @Override
+ public FlightInfo getFlightInfo(CallContext context, FlightDescriptor
descriptor) {
+ System.out.println("Calling to getFlightInfo");
+ FlightHolder h = holders.get(descriptor);
+ if (h == null) {
+ throw new IllegalStateException("Unknown descriptor.");
+ }
+
+ return h.getFlightInfo(location);
+ }
+
+ @Override
+ public Runnable acceptPut(CallContext context,
+ final FlightStream flightStream, final
StreamListener<PutResult> ackStream) {
+ return () -> {
+ System.out.println("Calling to acceptPut");
+ StreamCreator creator = null;
+ boolean success = false;
+ try (VectorSchemaRoot root = flightStream.getRoot()) {
+ final FlightHolder h = holders.computeIfAbsent(
+ flightStream.getDescriptor(),
+ t -> new FlightHolder(allocator, t,
flightStream.getSchema(), flightStream.getDictionaryProvider()));
+
+ creator = h.addStream(flightStream.getSchema());
+
+ VectorUnloader unloader = new VectorUnloader(root);
+ while (flightStream.next()) {
+
ackStream.onNext(PutResult.metadata(flightStream.getLatestMetadata()));
+ creator.add(unloader.getRecordBatch());
+ }
+ // Closing the stream will release the dictionaries
+ flightStream.takeDictionaryOwnership();
+ creator.complete();
+ success = true;
+ } finally {
+ if (!success) {
+ creator.drop();
+ }
+ }
+
+ };
+
+ }
+
+ @Override
+ public void doAction(CallContext context, Action action,
+ StreamListener<Result> listener) {
+ System.out.println("Calling to doAction");
+ switch (action.getType()) {
+ case "drop": {
+ listener.onNext(new Result(new byte[0]));
+ listener.onCompleted();
+ break;
+ }
+ default: {
+
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
+ }
+ }
+ }
+
+ @Override
+ public void listActions(CallContext context,
+ StreamListener<ActionType> listener) {
+ System.out.println("Calling to listActions");
+ listener.onNext(new ActionType("get", "pull a stream. Action must
be done via standard get mechanism"));
+ listener.onNext(new ActionType("put", "push a stream. Action must
be done via standard put mechanism"));
Review comment:
Deleted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]