This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit dfddd49875f8d4750b2f3fa061b12b231dfd0e57
Author: Qiuxia Fan <[email protected]>
AuthorDate: Mon Jun 28 18:08:55 2021 +0800

    fix runWrite
---
 api/fbs/Makefile                  |   2 +-
 api/fbs/v1/Trace_grpc.go          | 164 ++++++++++++++++++++
 banyand/liaison/grpc/grpc.go      | 169 +++++++--------------
 banyand/liaison/grpc/grpc_test.go | 305 ++++++++++++++++++++++++++++++++++----
 banyand/liaison/liaison.go        |   1 +
 banyand/series/service.go         |   8 +-
 pkg/convert/number.go             |   8 +
 7 files changed, 513 insertions(+), 144 deletions(-)

diff --git a/api/fbs/Makefile b/api/fbs/Makefile
index 660aae1..825c495 100644
--- a/api/fbs/Makefile
+++ b/api/fbs/Makefile
@@ -21,4 +21,4 @@ fbs := $(wildcard $(VERSION)/*.fbs)
 
 .PHONY: generate
 generate: $(fbs)
-       flatc --go --gen-onefile --go-namespace $(VERSION) -o $(VERSION) $^
+       flatc --go --gen-onefile --grpc --go-namespace $(VERSION) -o $(VERSION) 
$^
diff --git a/api/fbs/v1/Trace_grpc.go b/api/fbs/v1/Trace_grpc.go
new file mode 100644
index 0000000..7d2cb69
--- /dev/null
+++ b/api/fbs/v1/Trace_grpc.go
@@ -0,0 +1,164 @@
+//Generated by gRPC Go plugin
+//If you make any local changes, they will be lost
+//source: rpc
+
+package v1
+
+import (
+       context "context"
+
+       flatbuffers "github.com/google/flatbuffers/go"
+       grpc "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+)
+
+// Client API for Trace service
+type TraceClient interface {
+       Query(ctx context.Context, in *flatbuffers.Builder,
+               opts ...grpc.CallOption) (*QueryResponse, error)
+       Write(ctx context.Context,
+               opts ...grpc.CallOption) (Trace_WriteClient, error)
+}
+
+type traceClient struct {
+       cc grpc.ClientConnInterface
+}
+
+func NewTraceClient(cc grpc.ClientConnInterface) TraceClient {
+       return &traceClient{cc}
+}
+
+func (c *traceClient) Query(ctx context.Context, in *flatbuffers.Builder,
+       opts ...grpc.CallOption) (*QueryResponse, error) {
+       out := new(QueryResponse)
+       err := c.cc.Invoke(ctx, "/banyandb.v1.Trace/Query", in, out, opts...)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+func (c *traceClient) Write(ctx context.Context,
+       opts ...grpc.CallOption) (Trace_WriteClient, error) {
+       stream, err := c.cc.NewStream(ctx, &_Trace_serviceDesc.Streams[0], 
"/banyandb.v1.Trace/Write", opts...)
+       if err != nil {
+               return nil, err
+       }
+       x := &traceWriteClient{stream}
+       return x, nil
+}
+
+type Trace_WriteClient interface {
+       Send(*flatbuffers.Builder) error
+       Recv() (*WriteResponse, error)
+       grpc.ClientStream
+}
+
+type traceWriteClient struct {
+       grpc.ClientStream
+}
+
+func (x *traceWriteClient) Send(m *flatbuffers.Builder) error {
+       return x.ClientStream.SendMsg(m)
+}
+
+func (x *traceWriteClient) Recv() (*WriteResponse, error) {
+       m := new(WriteResponse)
+       if err := x.ClientStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+}
+
+// Server API for Trace service
+type TraceServer interface {
+       Query(context.Context, *EntityCriteria) (*flatbuffers.Builder, error)
+       Write(Trace_WriteServer) error
+       mustEmbedUnimplementedTraceServer()
+}
+
+type UnimplementedTraceServer struct {
+}
+
+func (UnimplementedTraceServer) Query(context.Context, *EntityCriteria) 
(*flatbuffers.Builder, error) {
+       return nil, status.Errorf(codes.Unimplemented, "method Query not 
implemented")
+}
+
+func (UnimplementedTraceServer) Write(Trace_WriteServer) error {
+       return status.Errorf(codes.Unimplemented, "method Write not 
implemented")
+}
+
+func (UnimplementedTraceServer) mustEmbedUnimplementedTraceServer() {}
+
+type UnsafeTraceServer interface {
+       mustEmbedUnimplementedTraceServer()
+}
+
+func RegisterTraceServer(s grpc.ServiceRegistrar, srv TraceServer) {
+       s.RegisterService(&_Trace_serviceDesc, srv)
+}
+
+func _Trace_Query_Handler(srv interface{}, ctx context.Context,
+       dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) 
(interface{}, error) {
+       in := new(EntityCriteria)
+       if err := dec(in); err != nil {
+               return nil, err
+       }
+       if interceptor == nil {
+               return srv.(TraceServer).Query(ctx, in)
+       }
+       info := &grpc.UnaryServerInfo{
+               Server:     srv,
+               FullMethod: "/banyandb.v1.Trace/Query",
+       }
+
+       handler := func(ctx context.Context, req interface{}) (interface{}, 
error) {
+               return srv.(TraceServer).Query(ctx, req.(*EntityCriteria))
+       }
+       return interceptor(ctx, in, info, handler)
+}
+func _Trace_Write_Handler(srv interface{}, stream grpc.ServerStream) error {
+       return srv.(TraceServer).Write(&traceWriteServer{stream})
+}
+
+type Trace_WriteServer interface {
+       Send(*flatbuffers.Builder) error
+       Recv() (*WriteEntity, error)
+       grpc.ServerStream
+}
+
+type traceWriteServer struct {
+       grpc.ServerStream
+}
+
+func (x *traceWriteServer) Send(m *flatbuffers.Builder) error {
+       return x.ServerStream.SendMsg(m)
+}
+
+func (x *traceWriteServer) Recv() (*WriteEntity, error) {
+       m := new(WriteEntity)
+       if err := x.ServerStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+}
+
+var _Trace_serviceDesc = grpc.ServiceDesc{
+       ServiceName: "banyandb.v1.Trace",
+       HandlerType: (*TraceServer)(nil),
+       Methods: []grpc.MethodDesc{
+               {
+                       MethodName: "Query",
+                       Handler:    _Trace_Query_Handler,
+               },
+       },
+       Streams: []grpc.StreamDesc{
+               {
+                       StreamName:    "Write",
+                       Handler:       _Trace_Write_Handler,
+                       ServerStreams: true,
+                       ClientStreams: true,
+               },
+       },
+}
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 0e82346..4156efb 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -20,18 +20,15 @@ package grpc
 import (
        "context"
        "fmt"
-       "net"
-       "strconv"
-       "time"
-
-       flatbuffers "github.com/google/flatbuffers/go"
-       grpclib "google.golang.org/grpc"
-       "google.golang.org/grpc/encoding"
-
        v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
+       flatbuffers "github.com/google/flatbuffers/go"
+       grpclib "google.golang.org/grpc"
+       "io"
+       "net"
+       "sync"
 )
 
 type Server struct {
@@ -39,6 +36,7 @@ type Server struct {
        log      *logger.Logger
        ser      *grpclib.Server
        pipeline queue.Queue
+       writeEntity *v1.WriteEntity
 }
 
 func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
@@ -58,7 +56,9 @@ func (s *Server) FlagSet() *run.FlagSet {
 func (s *Server) Validate() error {
        return nil
 }
-
+func init(){
+       //encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{})
+}
 func (s *Server) Serve() error {
        s.log = logger.GetLogger("grpc")
        lis, err := net.Listen("tcp", s.addr)
@@ -66,8 +66,11 @@ func (s *Server) Serve() error {
                s.log.Fatal("Failed to listen", logger.Error(err))
        }
 
-       encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{})
-       s.ser = grpclib.NewServer()
+
+       s.ser = 
grpclib.NewServer(grpclib.CustomCodec(flatbuffers.FlatbuffersCodec{}))
+       //s.ser = grpclib.NewServer()
+
+       v1.RegisterTraceServer(s.ser, &TraceServer{})
 
        return s.ser.Serve(lis)
 }
@@ -77,113 +80,49 @@ func (s *Server) GracefulStop() {
        s.ser.GracefulStop()
 }
 
-func WriteTraces(writeEntity *v1.WriteEntity) []byte {
-       fmt.Println("Write called...")
-       builder := flatbuffers.NewBuilder(0)
-       metaData := writeEntity.MetaData(nil)
-       entityValue := writeEntity.Entity(nil)
-       // Serialize MetaData
-       group, name := builder.CreateString(string(metaData.Group())), 
builder.CreateString(string(metaData.Name()))
-       v1.MetadataStart(builder)
-       v1.MetadataAddGroup(builder, group)
-       v1.MetadataAddName(builder, name)
-       v1.MetadataEnd(builder)
-       // Serialize Fields
-       v1.FieldStart(builder)
-       var fieldList []flatbuffers.UOffsetT
-       for i := 0; i < entityValue.FieldsLength(); i++ {
-               var f v1.Field
-               var str string
-               if ok := entityValue.Fields(&f, i); ok {
-                       unionValueType := new(flatbuffers.Table)
-                       if f.Value(unionValueType) {
-                               valueType := f.ValueType()
-                               if valueType == v1.ValueTypeString {
-                                       unionStr := new(v1.String)
-                                       unionStr.Init(unionValueType.Bytes, 
unionValueType.Pos)
-                                       v1.FieldAddValueType(builder, 
v1.ValueTypeString)
-                                       str = string(unionStr.Value())
-                               } else if valueType == v1.ValueTypeInt {
-                                       unionInt := new(v1.Int)
-                                       unionInt.Init(unionValueType.Bytes, 
unionValueType.Pos)
-                                       v1.FieldAddValueType(builder, 
v1.ValueTypeInt)
-                                       field := 
flatbuffers.UOffsetT(unionInt.Value())
-                                       v1.FieldAddValue(builder, field)
-                                       fieldList = append(fieldList, field)
-                               } else if valueType == v1.ValueTypeStringArray {
-                                       unionStrArray := new(v1.StringArray)
-                                       
unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos)
-                                       l := unionStrArray.ValueLength()
-                                       if l == 1 {
-                                               str += 
string(unionStrArray.Value(0))
-                                       } else {
-                                               str += "["
-                                               for j := 0; j < l; j++ {
-                                                       str += 
string(unionStrArray.Value(j))
-                                               }
-                                               str += "]"
-                                       }
-                               } else if valueType == v1.ValueTypeIntArray {
-                                       unionIntArray := new(v1.IntArray)
-                                       
unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos)
-                                       l := unionIntArray.ValueLength()
-                                       if l == 1 {
-                                               str += 
strconv.FormatInt(unionIntArray.Value(0), 10)
-                                       } else if l > 1{
-                                               str += "["
-                                               for j := 0; j < l; j++ {
-                                                       str += 
strconv.FormatInt(unionIntArray.Value(j), 10)
-                                               }
-                                               str += "]"
-                                       }
-                               }
-                               if valueType == v1.ValueTypeIntArray || 
valueType == v1.ValueTypeStringArray || valueType == v1.ValueTypeString {
-                                       field := builder.CreateString(str)
-                                       v1.FieldAddValue(builder, field)
-                                       fieldList = append(fieldList, field)
-                               }
-                       }
-               }
-       }
-       v1.FieldEnd(builder)
-       // Serialize EntityValue
-       dataBinaryLength := entityValue.DataBinaryLength()
-       v1.EntityStartDataBinaryVector(builder, dataBinaryLength)
-       for i := dataBinaryLength; i >= 0; i-- {
-               builder.PrependByte(byte(i))
-       }
-       dataBinary := builder.EndVector(dataBinaryLength)
-       entityId := builder.CreateString(string(entityValue.EntityId()))
-       v1.EntityValueStart(builder)
-       v1.EntityValueAddEntityId(builder, entityId)
-       time := uint64(time.Now().UnixNano())
-       v1.EntityValueAddTimestampNanoseconds(builder, time)
-       v1.EntityValueAddDataBinary(builder, dataBinary)
-       v1.EntityValueStartFieldsVector(builder, len(fieldList))
-       for val := range fieldList {
-               builder.PrependUOffsetT(flatbuffers.UOffsetT(val))
-       }
-       fields := builder.EndVector(len(fieldList))
-       v1.EntityValueAddFields(builder, fields)
-       v1.EntityValueEnd(builder)
-       trace := v1.WriteEntityEnd(builder)
-
-       builder.Finish(trace)
+//var _ gomock.TestHelper = (*TraceServer)(nil)
 
-       return builder.Bytes[builder.Head():]
+type TraceServer struct {
+       v1.UnimplementedTraceServer
+       writeData []*v1.WriteEntity
+       mu         sync.Mutex
 }
 
-type ComponentBuilderFunc func(*flatbuffers.Builder)
-
-func ReadTraces(funcs ...ComponentBuilderFunc) *v1.Entity {
-       b := flatbuffers.NewBuilder(1024)
-       v1.EntityStart(b)
-       for _, fun := range funcs {
-               fun(b)
+func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
+       for {
+               writeEntity, err := TraceWriteServer.Recv()
+               fmt.Println(writeEntity)
+               if err == io.EOF {
+                       return nil
+               }
+               if err != nil {
+                       return err
+               }
+               t.writeData = append(t.writeData, writeEntity)
+               builder := flatbuffers.NewBuilder(0)
+               v1.WriteResponseStart(builder)
+               builder.Finish(v1.WriteResponseEnd(builder))
+               if err := TraceWriteServer.Send(builder); err != nil {
+                       return err
+               }
+               //writeEntity.Entity().Fields()
+               //writeEntity.MetaData(nil).Group()
+               //serviceID+instanceID
+               //seriesID := hash(fieds, f1, f2)
+               //shardID := shardingFunc(seriesID, shardNum)
+               //queue
+               //for _, l := range t.writeData {
+               //      if err := TraceWriteServer.Send(l); err != nil {
+               //              return err
+               //      }
+               //}
        }
-       entityOffset := v1.EntityEnd(b)
-       b.Finish(entityOffset)
+}
+
+func (t *TraceServer) Query(ctx context.Context, entityCriteria 
*v1.EntityCriteria) (*flatbuffers.Builder, error) {
+       b := flatbuffers.NewBuilder(0)
+       v1.EntityCriteriaStart(b)
+       b.Finish(v1.EntityCriteriaEnd(b))
 
-       buf := b.Bytes[b.Head():]
-       return v1.GetRootAsEntity(buf, 0)
+       return b, nil
 }
diff --git a/banyand/liaison/grpc/grpc_test.go 
b/banyand/liaison/grpc/grpc_test.go
index a5bf5ca..b7d1ef1 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -15,28 +15,140 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package grpc_test
+package grpc
 
 import (
-       flatbuffers "github.com/google/flatbuffers/go"
-
+       "context"
+       "fmt"
        v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
-       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
-       "github.com/stretchr/testify/assert"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       flatbuffers "github.com/google/flatbuffers/go"
+       "google.golang.org/grpc"
+       "io"
+       "log"
        "testing"
+       "time"
 )
 
+var serverAddr = "localhost:17912"
+
 type ComponentBuilderFunc func(*flatbuffers.Builder)
-type criteriaBuilder struct {
+type writeEntityBuilder struct {
        *flatbuffers.Builder
 }
-func NewCriteriaBuilder() *criteriaBuilder {
-       return &criteriaBuilder{
+func NewCriteriaBuilder() *writeEntityBuilder {
+       return &writeEntityBuilder{
                flatbuffers.NewBuilder(1024),
        }
 }
+func (b *writeEntityBuilder) BuildMetaData(group, name string) 
ComponentBuilderFunc {
+       g, n := b.Builder.CreateString(group), b.Builder.CreateString(name)
+       v1.MetadataStart(b.Builder)
+       v1.MetadataAddGroup(b.Builder, g)
+       v1.MetadataAddName(b.Builder, n)
+       metadata := v1.MetadataEnd(b.Builder)
+       return func(b *flatbuffers.Builder) {
+               v1.WriteEntityAddMetaData(b, metadata)
+       }
+}
+
+func (b *writeEntityBuilder) BuildEntity(id string, binary []byte, items 
...interface{}) ComponentBuilderFunc {
+       entityId := b.Builder.CreateString(id)
+       binaryOffset := b.Builder.CreateByteVector(binary)
+       l := len(items)
+       var fieldOffsets []flatbuffers.UOffsetT
+       for i := 0; i < l; i++ {
+               o := b.BuildField(items[i])
+               fieldOffsets = append(fieldOffsets, o)
+       }
+       v1.EntityValueStartFieldsVector(b.Builder, len(fieldOffsets))
+       for i := 0; i < len(fieldOffsets); i++ {
+               b.PrependUOffsetT(fieldOffsets[i])
+       }
+       fields := b.EndVector(len(fieldOffsets))
+       v1.EntityValueStart(b.Builder)
+       v1.EntityValueAddEntityId(b.Builder, entityId)
+       t := uint64(time.Now().UnixNano())
+       v1.EntityValueAddTimestampNanoseconds(b.Builder, t)
+       v1.EntityValueAddDataBinary(b.Builder, binaryOffset)
+       v1.EntityValueAddFields(b.Builder, fields)
+       entity := v1.EntityValueEnd(b.Builder)
+       return func(b *flatbuffers.Builder) {
+               v1.WriteEntityAddEntity(b, entity)
+       }
+}
+
+func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT  
{
+       var ValueTypeOffset flatbuffers.UOffsetT
+       var valType v1.ValueType
+       switch v := val.(type) {
+       case int:
+               ValueTypeOffset = b.buildInt(int64(v))
+               valType = v1.ValueTypeInt
+       case []int:
+               ValueTypeOffset = b.buildInt(convert.IntToInt64(v...)...)
+               valType = v1.ValueTypeIntArray
+       case int64:
+               ValueTypeOffset = b.buildInt(v)
+               valType = v1.ValueTypeInt
+       case []int64:
+               ValueTypeOffset = b.buildInt(v...)
+               valType = v1.ValueTypeIntArray
+       case string:
+               ValueTypeOffset = b.buildStrValueType(v)
+               valType = v1.ValueTypeString
+       case []string:
+               ValueTypeOffset = b.buildStrValueType(v...)
+               valType = v1.ValueTypeStringArray
+       default:
+               panic("not supported values")
+       }
+
+       v1.FieldStart(b.Builder)
+       v1.FieldAddValue(b.Builder, ValueTypeOffset)
+       v1.FieldAddValueType(b.Builder, valType)
+       return v1.FieldEnd(b.Builder)
+}
+
+func (b *writeEntityBuilder) buildStrValueType(values ...string) 
flatbuffers.UOffsetT {
+       var strOffsets []flatbuffers.UOffsetT
+       for i := 0; i < len(values); i++ {
+               strOffsets = append(strOffsets, b.CreateString(values[i]))
+       }
+       v1.StringArrayStartValueVector(b.Builder, len(values))
+       for i := 0; i < len(strOffsets); i++ {
+               b.Builder.PrependUOffsetT(strOffsets[i])
+       }
+       int64Arr := b.Builder.EndVector(len(values))
+       v1.IntArrayStart(b.Builder)
+       v1.IntArrayAddValue(b.Builder, int64Arr)
+       return v1.IntArrayEnd(b.Builder)
+}
+
+func (b *writeEntityBuilder) buildInt(values ...int64) flatbuffers.UOffsetT {
+       v1.IntArrayStartValueVector(b.Builder, len(values))
+       for i := 0; i < len(values); i++ {
+               b.Builder.PrependInt64(values[i])
+       }
+       int64Arr := b.Builder.EndVector(len(values))
 
-func (b *criteriaBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntity 
{
+       v1.IntArrayStart(b.Builder)
+       v1.IntArrayAddValue(b.Builder, int64Arr)
+       return v1.IntArrayEnd(b.Builder)
+}
+
+func (b *writeEntityBuilder) BuildDataBinary(binary []byte) 
flatbuffers.UOffsetT {
+       dataBinaryLength := len(binary)
+       v1.EntityStartDataBinaryVector(b.Builder, dataBinaryLength)
+       for i := dataBinaryLength; i >= 0; i-- {
+               b.Builder.PrependByte(byte(i))
+       }
+       dataBinaryOffset := b.Builder.EndVector(dataBinaryLength)
+
+       return dataBinaryOffset
+}
+
+func (b *writeEntityBuilder) Build(funcs ...ComponentBuilderFunc) 
*v1.WriteEntity {
        v1.WriteEntityStart(b.Builder)
        for _, fun := range funcs {
                fun(b.Builder)
@@ -47,21 +159,164 @@ func (b *criteriaBuilder) Build(funcs 
...ComponentBuilderFunc) *v1.WriteEntity {
        buf := b.Bytes[b.Head():]
        return v1.GetRootAsWriteEntity(buf, 0)
 }
-func BenchmarkWriteTraces(t *testing.T) {
-       tester := assert.New(t)
-       builder := NewCriteriaBuilder()
-       entity := builder.Build(
-       )
-       res := grpc.WriteTraces(entity)
-       //tester.NoError(err)
-       tester.NotNil(res)
-       //tester.NoError(plan.Validate())
+
+func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
+       builder := flatbuffers.NewBuilder(0)
+       metaData := writeEntity.MetaData(nil)
+       entityValue := writeEntity.Entity(nil)
+       // Serialize MetaData
+       group, name := builder.CreateString(string(metaData.Group())), 
builder.CreateString(string(metaData.Name()))
+       v1.MetadataStart(builder)
+       v1.MetadataAddGroup(builder, group)
+       v1.MetadataAddName(builder, name)
+       v1.MetadataEnd(builder)
+       // Serialize Fields
+       var fieldList []flatbuffers.UOffsetT
+       for i := 0; i < 1; i++ {
+               var field v1.Field
+               var str string
+               if ok := entityValue.Fields(&field, i); ok {
+                       unionValueType := new(flatbuffers.Table)
+                       if field.Value(unionValueType) {
+                               valueType := field.ValueType()
+                               if valueType == v1.ValueTypeString {
+                                       unionStr := new(v1.String)
+                                       unionStr.Init(unionValueType.Bytes, 
unionValueType.Pos)
+                                       v1.FieldStart(builder)
+                                       v1.FieldAddValueType(builder, 
v1.ValueTypeString)
+                                       v1.FieldEnd(builder)
+                                       str = string(unionStr.Value())
+                                       f := builder.CreateString(str)
+                                       fieldList = append(fieldList, f)
+                               } else if valueType == v1.ValueTypeInt {
+                                       unionInt := new(v1.Int)
+                                       unionInt.Init(unionValueType.Bytes, 
unionValueType.Pos)
+                                       v1.FieldStart(builder)
+                                       v1.FieldAddValueType(builder, 
v1.ValueTypeInt)
+                                       v1.FieldEnd(builder)
+                                       f := 
flatbuffers.UOffsetT(unionInt.Value())
+                                       //v1.IntAddValue(builder, int64(f))
+                                       fieldList = append(fieldList, f)
+                               } else if valueType == v1.ValueTypeStringArray {
+                                       unionStrArray := new(v1.StringArray)
+                                       
unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos)
+                                       v1.FieldStart(builder)
+                                       v1.FieldAddValueType(builder, 
v1.ValueTypeStringArray)
+                                       v1.FieldEnd(builder)
+                                       l := unionStrArray.ValueLength()
+                                       var offsets []flatbuffers.UOffsetT
+                                       for j := 0; j < l; j++ {
+                                               v := 
builder.CreateString(string(unionStrArray.Value(j)))
+                                               v1.StringArrayStart(builder)
+                                               v1.StringArrayAddValue(builder, 
v)
+                                               offset := 
v1.StringArrayEnd(builder)
+                                               offsets = append(offsets, 
offset)
+                                       }
+                                       v1.StringArrayStartValueVector(builder, 
l)
+                                       for o := range offsets {
+                                               
builder.PrependUOffsetT(flatbuffers.UOffsetT(o))
+                                       }
+                                       f := builder.EndVector(l)
+                                       fieldList = append(fieldList, f)
+                               } else if valueType == v1.ValueTypeIntArray {
+                                       unionIntArray := new(v1.IntArray)
+                                       
unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos)
+                                       v1.FieldStart(builder)
+                                       v1.FieldAddValueType(builder, 
v1.ValueTypeIntArray)
+                                       v1.FieldEnd(builder)
+                                       l := unionIntArray.ValueLength()
+                                       var offsets []flatbuffers.UOffsetT
+                                       for j := 0; j < l; j++ {
+                                               v1.IntArrayStart(builder)
+                                               v1.IntArrayAddValue(builder, 
flatbuffers.UOffsetT(unionIntArray.Value(j)))
+                                               offset := 
v1.StringArrayEnd(builder)
+                                               offsets = append(offsets, 
offset)
+                                       }
+                                       v1.IntArrayStartValueVector(builder, 
len(offsets))
+                                       for o := range offsets {
+                                               
builder.PrependUOffsetT(flatbuffers.UOffsetT(o))
+                                       }
+                                       f := builder.EndVector(len(offsets))
+                                       fieldList = append(fieldList, f)
+                               }
+                       }
+               }
+       }
+       v1.FieldStart(builder)
+       for field := range fieldList {
+               v1.FieldAddValue(builder, flatbuffers.UOffsetT(field))
+       }
+       v1.FieldEnd(builder)
+       // Serialize EntityValue
+       dataBinaryLength := 10
+       v1.EntityStartDataBinaryVector(builder, dataBinaryLength)
+       for i := dataBinaryLength; i >= 0; i-- {
+               builder.PrependByte(byte(i))
+       }
+       dataBinaryP := builder.EndVector(dataBinaryLength)
+       v1.EntityValueStartFieldsVector(builder, len(fieldList))
+       for val := range fieldList {
+               builder.PrependUOffsetT(flatbuffers.UOffsetT(val))
+       }
+       fieldsP := builder.EndVector(len(fieldList))
+       entityId := builder.CreateString(string(entityValue.EntityId()))
+       v1.EntityValueStart(builder)
+       v1.EntityValueAddEntityId(builder, entityId)
+       time := uint64(time.Now().UnixNano())
+       v1.EntityValueAddTimestampNanoseconds(builder, time)
+       v1.EntityValueAddDataBinary(builder, dataBinaryP)
+       v1.EntityValueAddFields(builder, fieldsP)
+       v1.EntityValueEnd(builder)
+       v1.WriteEntityStart(builder)
+       position := v1.WriteEntityEnd(builder)
+       builder.Finish(position)
+
+       return  builder, nil
 }
 
-func BenchmarkReadTraces(t *testing.T) {
-       tester := assert.New(t)
-       builder := NewCriteriaBuilder()
-       entity := builder.Build()
-       res := grpc.ReadTraces(entity)
-       tester.NotNil(res)
-}
\ No newline at end of file
+func Test_grpc_write(t *testing.T) {
+       conn, err := grpc.Dial(serverAddr, grpc.WithInsecure(), 
grpc.WithDefaultCallOptions(grpc.CustomCodecCallOption{Codec: 
flatbuffers.FlatbuffersCodec{}}))
+       if err != nil {
+               log.Fatalf("Failed to connect: %v", err)
+       }
+       defer conn.Close()
+
+       client := v1.NewTraceClient(conn)
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       b := NewCriteriaBuilder()
+       binary := byte(123)
+       entity := b.Build(
+               b.BuildEntity("entityId", []byte{binary}, "service_name", 
"endpoint_id"),
+               b.BuildMetaData("default", "trace"),
+       )
+       fmt.Println(entity)
+       builder, e := runWrite(entity)
+       if e != nil {
+               log.Fatalf("Failed to connect: %v", err)
+       }
+       defer cancel()
+       stream, er := client.Write(ctx)
+       if er != nil {
+               log.Fatalf("%v.runWrite(_) = _, %v", client, err)
+       }
+       //waitc := make(chan struct{})
+       go func() {
+               for {
+                       writeResponse, err := stream.Recv()
+                       if err == io.EOF {
+                               // read done.
+                               //close(waitc)
+                               return
+                       }
+                       if err != nil {
+                               log.Fatalf("Failed to receive data : %v", err)
+                       }
+                       println( writeResponse)
+               }
+       }()
+       if error := stream.Send(builder); error != nil {
+               log.Fatalf("Failed to send a note: %v", err)
+       }
+       stream.CloseSend()
+       //<-waitc
+}
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index 76e8d14..117b361 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -19,6 +19,7 @@ package liaison
 
 import (
        "context"
+
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/run"
diff --git a/banyand/series/service.go b/banyand/series/service.go
index a62c7d0..beb3361 100644
--- a/banyand/series/service.go
+++ b/banyand/series/service.go
@@ -20,6 +20,10 @@ package series
 import (
        "bytes"
        "context"
+       "time"
+
+       "go.uber.org/multierr"
+
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
@@ -27,14 +31,12 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/series/schema/sw"
        "github.com/apache/skywalking-banyandb/banyand/storage"
        "github.com/apache/skywalking-banyandb/pkg/run"
-       "go.uber.org/multierr"
-       "time"
 )
 
 var _ Service = (*service)(nil)
 
 type service struct {
-       db storage.Database
+       db   storage.Database
        addr string
 }
 
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index 9656912..0dbc4ca 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -34,3 +34,11 @@ func Uint32ToBytes(u uint32) []byte {
 func BytesToUint64(b []byte) uint64 {
        return binary.BigEndian.Uint64(b)
 }
+
+func IntToInt64(numbers ...int) []int64 {
+       var arr []int64
+       for i := 0; i < len(numbers); i++ {
+               arr = append(arr, int64(numbers[i]))
+       }
+       return arr
+}
\ No newline at end of file

Reply via email to