David Dali Susanibar Arce created ARROW-15703:
-------------------------------------------------

             Summary: [Java]: Create custom sphinx plugin to help us with java 
verbose code to showcase highlighting code
                 Key: ARROW-15703
                 URL: https://issues.apache.org/jira/browse/ARROW-15703
             Project: Apache Arrow
          Issue Type: Sub-task
            Reporter: David Dali Susanibar Arce
            Assignee: David Dali Susanibar Arce


We are running java cookbook code thru sphinx using our custom extension 
[https://github.com/apache/arrow-cookbook/blob/main/java/ext/javadoctest.py] 

 

We need to create another extension to only show our end user the java code 
that is needed to showcase but running the whole java code cookbook at testing 
part.

 

Current documentation:
{code:java}
Validate Delete Data
********************

And confirm that it's been deleted:

.. testcode::

    import org.apache.arrow.flight.Action;
    import org.apache.arrow.flight.AsyncPutListener;
    import org.apache.arrow.flight.Criteria;
    import org.apache.arrow.flight.FlightClient;
    import org.apache.arrow.flight.FlightDescriptor;
    import org.apache.arrow.flight.FlightEndpoint;
    import org.apache.arrow.flight.FlightInfo;
    import org.apache.arrow.flight.FlightServer;
    import org.apache.arrow.flight.FlightStream;
    import org.apache.arrow.flight.Location;
    import org.apache.arrow.flight.NoOpFlightProducer;
    import org.apache.arrow.flight.PutResult;
    import org.apache.arrow.flight.Result;
    import org.apache.arrow.flight.Ticket;
    import org.apache.arrow.memory.RootAllocator;
    import org.apache.arrow.vector.VarCharVector;
    import org.apache.arrow.vector.VectorSchemaRoot;
    import org.apache.arrow.vector.VectorUnloader;
    import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
    import org.apache.arrow.vector.types.pojo.ArrowType;
    import org.apache.arrow.vector.types.pojo.Field;
    import org.apache.arrow.vector.types.pojo.FieldType;
    import org.apache.arrow.vector.types.pojo.Schema;

    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;

    class DataInMemory {
        private List<ArrowRecordBatch> listArrowRecordBatch;
        private Schema schema;
        private Long rows;
        public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, Schema 
schema, Long rows) {
            this.listArrowRecordBatch = listArrowRecordBatch;
            this.schema = schema;
            this.rows = rows;
        }
        public List<ArrowRecordBatch> getListArrowRecordBatch() {
            return listArrowRecordBatch;
        }
        public Schema getSchema() {
            return schema;
        }
        public Long getRows() {
            return rows;
        }
    }

    // Server
    Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
    Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>();
    Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>();
    List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>();
    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
        FlightServer flightServer = FlightServer.builder(allocator, location, 
new NoOpFlightProducer(){
            @Override
            public Runnable acceptPut(CallContext context, FlightStream 
flightStream, StreamListener<PutResult> ackStream) {
                return () -> {
                    long rows = 0;
                    while (flightStream.next()) {
                        VectorUnloader unloader = new 
VectorUnloader(flightStream.getRoot());
                        try (final ArrowRecordBatch arb = 
unloader.getRecordBatch()) {
                            // Retain data information
                            listArrowRecordBatch.add(arb);
                            rows = rows + flightStream.getRoot().getRowCount();
                        }
                    }
                    long finalRows = rows;
                    DataInMemory pojoFlightDataInMemory = new 
DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows);
                    dataInMemory.put(flightStream.getDescriptor(), 
pojoFlightDataInMemory);
                    ackStream.onCompleted();
                };
            }

            @Override
            public void doAction(CallContext context, Action action, 
StreamListener<Result> listener) {
                FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key 
configured
                if(dataInMemory.containsKey(flightDescriptor)) {
                    switch (action.getType()) {
                        case "DELETE":
                            dataInMemory.remove(flightDescriptor);
                            Result result = new Result("Delete 
completed".getBytes(StandardCharsets.UTF_8));
                            listener.onNext(result);
                    }
                    listener.onCompleted();
                }
            }

            @Override
            public FlightInfo getFlightInfo(CallContext context, 
FlightDescriptor descriptor) {
                if(!dataInMemory.containsKey(descriptor)){
                    throw new IllegalStateException("Unknown descriptor.");
                }
                return new FlightInfo(
                        dataInMemory.get(descriptor).getSchema(),
                        descriptor,
                        Collections.singletonList(new FlightEndpoint(new 
Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), 
location)), // Configure a key to map back and forward your data using Ticket 
argument
                        allocator.getAllocatedMemory(),
                        dataInMemory.get(descriptor).getRows()
                );
            }

            @Override
            public void listFlights(CallContext context, Criteria criteria, 
StreamListener<FlightInfo> listener) {
                dataInMemory.forEach((k, v) -> {
                    FlightInfo flightInfo = getFlightInfo(null, k);
                    listener.onNext(flightInfo);
                    }
                );
                listener.onCompleted();
            }
        }).build();
        try {
            flightServer.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // Client
    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
        // Populate data
        FlightClient flightClient = FlightClient.builder(allocator, 
location).build();
        Schema schema = new Schema(Arrays.asList( new Field("name", 
FieldType.nullable(new ArrowType.Utf8()), null)));
        VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, 
allocator);
        VarCharVector varCharVector = (VarCharVector) 
vectorSchemaRoot.getVector("name");
        varCharVector.allocateNew(3);
        varCharVector.set(0, "Ronald".getBytes());
        varCharVector.set(1, "David".getBytes());
        varCharVector.set(2, "Francisco".getBytes());
        varCharVector.setValueCount(3);
        vectorSchemaRoot.setRowCount(3);
        FlightClient.ClientStreamListener listener = 
flightClient.startPut(FlightDescriptor.path("profiles"), vectorSchemaRoot, new 
AsyncPutListener());
        listener.putNext();
        vectorSchemaRoot.allocateNew();
        varCharVector.set(0, "Manuel".getBytes());
        varCharVector.set(1, "Felipe".getBytes());
        varCharVector.set(2, "JJ".getBytes());
        varCharVector.setValueCount(3);
        vectorSchemaRoot.setRowCount(3);
        listener.putNext();
        vectorSchemaRoot.clear();
        listener.completed();
        listener.getResult();

        // Do delete action
        Iterator<Result> deleteActionResult = flightClient.doAction(new 
Action("DELETE", 
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
 ));
        while(deleteActionResult.hasNext()){
            Result result = deleteActionResult.next();
            System.out.println("Do Delete Action: " + new 
String(result.getBody(), StandardCharsets.UTF_8));
        }

        // Get all metadata information
        Iterable<FlightInfo> flightInfos = 
flightClient.listFlights(Criteria.ALL);
        flightInfos.forEach(t -> System.out.println(t));
        System.out.println("List Flights Info (after delete): No records");
    }

.. testoutput::

    Do Delete Action: Delete completed
    List Flights Info (after delete): No records {code}
 

How it could be:

Only offer to the user the main code but running behind scene all the code 
needed
{code:java}
// Server
@Override
public void doAction(CallContext context, Action action, StreamListener<Result> 
listener) {
    FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key 
configured
    if(dataInMemory.containsKey(flightDescriptor)) {
        switch (action.getType()) {
            case "DELETE":
                dataInMemory.remove(flightDescriptor);
                Result result = new Result("Delete 
completed".getBytes(StandardCharsets.UTF_8));
                listener.onNext(result);
        }
        listener.onCompleted();
    }
}

// Client
// Do delete action
Iterator<Result> deleteActionResult = flightClient.doAction(new 
Action("DELETE", 
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
 ));
while(deleteActionResult.hasNext()){
    Result result = deleteActionResult.next();
    System.out.println("Do Delete Action: " + new String(result.getBody(), 
StandardCharsets.UTF_8));
}

// Get all metadata information
Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
flightInfos.forEach(t -> System.out.println(t));
System.out.println("List Flights Info (after delete): No records");
 {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to