iamsmkr opened a new issue, #218:
URL: https://github.com/apache/arrow-cookbook/issues/218

   I am trying out the cookbook java example 
[here](https://arrow.apache.org/cookbook/java/flight.html). The only change is 
that I am trying to write multiple batches. See "batch" comment in the code.
   
   Upon running this example I am seeing unexpected **overlapping results**!! 
This thing gets wierder with multi-threading.
   Please suggest what is the correct way of sending multiple batches! 
   
   ```
   S1: Server (Location): Listening on port 33333
   C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
   WARNING: An illegal reflective access operation has occurred
   WARNING: Illegal reflective access by 
org.apache.arrow.memory.util.MemoryUtil 
(file:/Users/rentsher/.m2/repository/org/apache/arrow/arrow-memory-core/8.0.0/arrow-memory-core-8.0.0.jar)
 to field java.nio.Buffer.address
   WARNING: Please consider reporting this to the maintainers of 
org.apache.arrow.memory.util.MemoryUtil
   WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
   WARNING: All illegal access operations will be denied in a future release
   C2: Client (Populate Data): Wrote 2 batches with 3 rows each
   C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Int(64, true) not 
null>, descriptor=profiles, 
endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], 
ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
   C4: Client (Get Stream):
   Client Received batch apache/arrow#1, Data:
   vector size: 10
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   Client Received batch apache/arrow#2, Data:
   vector size: 10
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   Client Received batch apache/arrow#3, Data:
   vector size: 10
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   Client Received batch apache/arrow#4, Data:
   vector size: 10
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   Client Received batch apache/arrow#5, Data:
   vector size: 10
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   Client Received batch apache/arrow#6, Data:
   vector size: 10
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Int(64, true) 
not null>, descriptor=profiles, 
endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], 
ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
   C6: Client (Do Delete Action): Delete completed
   C7: Client (List Flights Info): After delete - No records
   C8: Server shut down successfully
   
   Process finished with exit code 0
   ```
   
   ```java
   package com.iamsmkr.arrowflight;
   
   import org.apache.arrow.flight.Action;
   import org.apache.arrow.flight.AsyncPutListener;
   import org.apache.arrow.flight.Criteria;
   import org.apache.arrow.flight.FlightClient;
   import org.apache.arrow.flight.FlightDescriptor;
   import org.apache.arrow.flight.FlightInfo;
   import org.apache.arrow.flight.FlightServer;
   import org.apache.arrow.flight.FlightStream;
   import org.apache.arrow.flight.Location;
   import org.apache.arrow.flight.Result;
   import org.apache.arrow.flight.Ticket;
   import org.apache.arrow.memory.BufferAllocator;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.BigIntVector;
   import org.apache.arrow.vector.VectorSchemaRoot;
   import org.apache.arrow.vector.holders.NullableVarCharHolder;
   import org.apache.arrow.vector.types.pojo.ArrowType;
   import org.apache.arrow.vector.types.pojo.Field;
   import org.apache.arrow.vector.types.pojo.FieldType;
   import org.apache.arrow.vector.types.pojo.Schema;
   
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   import java.util.Arrays;
   import java.util.Iterator;
   
   public class CookbookApp {
   
       public static void main(String[] args) {
   
           Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
           try (BufferAllocator allocator = new RootAllocator()) {
               // Server
               try (FlightServer flightServer = FlightServer.builder(allocator, 
location, new ArrowFlightProducer(allocator, location)).build()) {
                   try {
                       flightServer.start();
                       System.out.println("S1: Server (Location): Listening on 
port " + flightServer.getPort());
                   } catch (IOException e) {
                       System.exit(1);
                   }
   
                   // Client
                   try (FlightClient flightClient = 
FlightClient.builder(allocator, location).build()) {
                       System.out.println("C1: Client (Location): Connected to 
" + location.getUri());
   
                       // Populate data
                       Schema schema = new Schema(Arrays.asList(
                               new Field("name", new FieldType(false, new 
ArrowType.Int(64, true), null), null)));
   
                       try (
                               VectorSchemaRoot vectorSchemaRoot = 
VectorSchemaRoot.create(schema, allocator);
                               BigIntVector names = (BigIntVector) 
vectorSchemaRoot.getVector("name")
                       ) {
                           FlightClient.ClientStreamListener listener =
                                   flightClient.startPut(
                                           FlightDescriptor.path("profiles"),
                                           vectorSchemaRoot,
                                           new AsyncPutListener()
                                   );
   
                           // Batch 1
                           int j = 0;
                           for (long i = 0; i < 10; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 2
                           j = 0;
                           for (long i = 10; i < 20; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 3
                           j = 0;
                           for (long i = 20; i < 30; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 4
                           j = 0;
                           for (long i = 30; i < 40; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 5
                           j = 0;
                           for (long i = 40; i < 50; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 6
                           j = 0;
                           for (long i = 50; i < 60; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           listener.completed();
                           listener.getResult();
                           
                           System.out.println("C2: Client (Populate Data): 
Wrote 2 batches with 3 rows each");
                       }
   
                       // Get metadata information
                       FlightInfo flightInfo = 
flightClient.getInfo(FlightDescriptor.path("profiles"));
                       System.out.println("C3: Client (Get Metadata): " + 
flightInfo);
   
                       // Get data information
                       try (FlightStream flightStream = 
flightClient.getStream(new Ticket(
                               
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8))))
 {
                           int batch = 0;
                           try (
                                   VectorSchemaRoot vectorSchemaRootReceived = 
flightStream.getRoot();
                                   BigIntVector names = (BigIntVector) 
vectorSchemaRootReceived.getVector("name")
                           ) {
                               System.out.println("C4: Client (Get Stream):");
                               while (flightStream.next()) {
                                   batch++;
                                   System.out.println("Client Received batch #" 
+ batch + ", Data:");
   //                                
System.out.print(vectorSchemaRootReceived.contentToTSVString());
                                   int i = 
vectorSchemaRootReceived.getRowCount();
                                   System.out.println("vector size: " + i);
                                   int j = 0;
                                   while (j < i) {
                                       System.out.println(names.get(j));
   //                                    names.get(j);
   //                                    copy(vcHolder, tmpSB);
   //                                    System.out.println("name" + j + ": " + 
tmpSB);
                                       j++;
                                   }
                               }
                           }
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
   
                       // Get all metadata information
                       Iterable<FlightInfo> flightInfosBefore = 
flightClient.listFlights(Criteria.ALL);
                       System.out.print("C5: Client (List Flights Info): ");
                       flightInfosBefore.forEach(t -> System.out.println(t));
   
                       // Do delete action
                       Iterator<Result> deleteActionResult = 
flightClient.doAction(new Action("DELETE",
                               
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
                       while (deleteActionResult.hasNext()) {
                           Result result = deleteActionResult.next();
                           System.out.println("C6: Client (Do Delete Action): " 
+
                                   new String(result.getBody(), 
StandardCharsets.UTF_8));
                       }
   
                       // Get all metadata information (to validate detele 
action)
                       Iterable<FlightInfo> flightInfos = 
flightClient.listFlights(Criteria.ALL);
                       flightInfos.forEach(System.out::println);
                       System.out.println("C7: Client (List Flights Info): 
After delete - No records");
   
                       // Server shut down
                       flightServer.shutdown();
                       System.out.println("C8: Server shut down successfully");
                   }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to