This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit a7dfca8bde7ae6bdd6d6e0d15df9eec435b88893 Author: sjwiesman <sjwies...@gmail.com> AuthorDate: Wed Aug 11 09:18:19 2021 -0500 [FLINK-18810][sdk] Golang remote functions smoke test --- statefun-e2e-tests/pom.xml | 1 + .../statefun-smoke-e2e-golang/pom.xml | 30 + .../src/main/go/CommandInterpreterFn.go | 133 +++ .../src/main/go/SmokeE2EMain.go | 50 ++ .../src/main/go/commands.pb.go | 915 +++++++++++++++++++++ .../src/main/protobuf/commands.proto | 71 ++ .../smoke/golang/SmokeVerificationGolangE2E.java | 74 ++ .../src/test/resources/Dockerfile | 21 + .../src/test/resources/Dockerfile.remote-function | 28 + .../src/test/resources/log4j.properties | 24 + .../src/test/resources/remote-module/module.yaml | 20 + 11 files changed, 1367 insertions(+) diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml index d00e6df..9e098be 100644 --- a/statefun-e2e-tests/pom.xml +++ b/statefun-e2e-tests/pom.xml @@ -40,6 +40,7 @@ under the License. <module>statefun-smoke-e2e-multilang-base</module> <module>statefun-smoke-e2e-multilang-harness</module> <module>statefun-smoke-e2e-java</module> + <module>statefun-smoke-e2e-golang</module> </modules> <build> diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-golang/pom.xml new file mode 100644 index 0000000..049e916 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/pom.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-smoke-e2e-multilang-base</artifactId> + <version>3.1-SNAPSHOT</version> + <relativePath>../statefun-smoke-e2e-multilang-base/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>statefun-smoke-e2e-golang</artifactId> +</project> \ No newline at end of file diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/CommandInterpreterFn.go b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/CommandInterpreterFn.go new file mode 100644 index 0000000..3d149f0 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/CommandInterpreterFn.go @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "strconv" + "time" +) + +var ( + State = statefun.ValueSpec{ + Name: "state", + ValueType: statefun.Int64Type, + } +) + +func CommandInterpreterFn(ctx statefun.Context, message statefun.Message) error { + if message.Is(sourceCommandsType) { + cmds := &SourceCommand{} + if err := message.As(sourceCommandsType, cmds); err != nil { + return fmt.Errorf("failed to deserialize source commonds: %w", err) + } + return interpret(ctx, cmds.GetCommands()) + } else if message.Is(commandsType) { + cmds := &Commands{} + if err := message.As(commandsType, cmds); err != nil { + return fmt.Errorf("failed to deserialize commonds: %w", err) + } + return interpret(ctx, cmds) + } + + return fmt.Errorf("unrecognized message type %v", message.ValueTypeName()) +} + +func interpret(ctx statefun.Context, cmds *Commands) error { + for _, cmd := range cmds.GetCommand() { + if cmd.GetIncrement() != nil { + modifyState(ctx) + } else if cmd.GetSend() != nil { + send(ctx, cmd.GetSend()) + } else if cmd.GetSendAfter() != nil { + sendAfter(ctx, cmd.GetSendAfter()) + } else if cmd.GetSendEgress() != nil { + sendEgress(ctx) + } else if cmd.GetVerify() != nil { + verify(ctx, cmd.GetVerify()) + } + } + + return nil +} + +func modifyState(ctx statefun.Context) { + var value int64 + ctx.Storage().Get(State, &value) + value += 1 + ctx.Storage().Set(State, value) +} + +func send(ctx statefun.Context, cmd *Command_Send) { + target := statefun.Address{ + FunctionType: commandFn, + Id: fmt.Sprint(cmd.GetTarget()), + } + + message := statefun.MessageBuilder{ + Target: target, + Value: cmd.GetCommands(), + ValueType: commandsType, + } + + ctx.Send(message) +} + +func sendAfter(ctx statefun.Context, cmd *Command_SendAfter) { + target := statefun.Address{ + FunctionType: commandFn, + Id: fmt.Sprint(cmd.GetTarget()), + } + + message := statefun.MessageBuilder{ + Target: target, + Value: cmd.GetCommands(), + ValueType: commandsType, + } + + ctx.SendAfter(time.Duration(1), message) +} + +func sendEgress(ctx statefun.Context) { + message := statefun.GenericEgressBuilder{ + Target: discardEgress, + Value: "discarded-message", + ValueType: statefun.StringType, + } + + ctx.SendEgress(message) +} + +func verify(ctx statefun.Context, verify *Command_Verify) { + id, _ := strconv.Atoi(ctx.Self().Id) + + var actual int64 + ctx.Storage().Get(State, &actual) + + message := statefun.GenericEgressBuilder{ + Target: verificationEgress, + Value: &VerificationResult{ + Id: int32(id), + Expected: verify.GetExpected(), + Actual: actual, + }, + ValueType: verificationType, + } + + ctx.SendEgress(message) +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/SmokeE2EMain.go b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/SmokeE2EMain.go new file mode 100644 index 0000000..c9fb811 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/SmokeE2EMain.go @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "log" + "net/http" +) + +var ( + appNamespace = "statefun.smoke.e2e" + commandFn, _ = statefun.TypeNameFromParts(appNamespace, "command-interpreter-fn") + discardEgress, _ = statefun.TypeNameFromParts(appNamespace, "discard-sink") + verificationEgress, _ = statefun.TypeNameFromParts(appNamespace, "verification-sink") + commandsTypeName, _ = statefun.TypeNameFromParts(appNamespace, "commands") + sourceCommandsTypeName, _ = statefun.TypeNameFromParts(appNamespace, "source-command") + verificationTypeName, _ = statefun.TypeNameFromParts(appNamespace, "verification-result") + commandsType = statefun.MakeProtobufTypeWithTypeName(commandsTypeName) + sourceCommandsType = statefun.MakeProtobufTypeWithTypeName(sourceCommandsTypeName) + verificationType = statefun.MakeProtobufTypeWithTypeName(verificationTypeName) +) + +func main() { + spec := statefun.StatefulFunctionSpec{ + FunctionType: commandFn, + States: []statefun.ValueSpec{State}, + Function: statefun.StatefulFunctionPointer(CommandInterpreterFn), + } + + builder := statefun.StatefulFunctionsBuilder() + _ = builder.WithSpec(spec) + + http.Handle("/", builder.AsHandler()) + log.Fatal(http.ListenAndServe(":8000", nil)) +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/commands.pb.go b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/commands.pb.go new file mode 100644 index 0000000..1c4c667 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/commands.pb.go @@ -0,0 +1,915 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.17.3 +// source: commands.proto + +package main + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type SourceCommand struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Target int32 `protobuf:"varint,1,opt,name=target,proto3" json:"target,omitempty"` + Commands *Commands `protobuf:"bytes,2,opt,name=commands,proto3" json:"commands,omitempty"` +} + +func (x *SourceCommand) Reset() { + *x = SourceCommand{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SourceCommand) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SourceCommand) ProtoMessage() {} + +func (x *SourceCommand) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SourceCommand.ProtoReflect.Descriptor instead. +func (*SourceCommand) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{0} +} + +func (x *SourceCommand) GetTarget() int32 { + if x != nil { + return x.Target + } + return 0 +} + +func (x *SourceCommand) GetCommands() *Commands { + if x != nil { + return x.Commands + } + return nil +} + +type Commands struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Command []*Command `protobuf:"bytes,1,rep,name=command,proto3" json:"command,omitempty"` +} + +func (x *Commands) Reset() { + *x = Commands{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Commands) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Commands) ProtoMessage() {} + +func (x *Commands) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Commands.ProtoReflect.Descriptor instead. +func (*Commands) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{1} +} + +func (x *Commands) GetCommand() []*Command { + if x != nil { + return x.Command + } + return nil +} + +type Command struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Command: + // *Command_Increment + // *Command_Send_ + // *Command_SendAfter_ + // *Command_SendEgress_ + // *Command_AsyncOperation_ + // *Command_Verify_ + Command isCommand_Command `protobuf_oneof:"command"` +} + +func (x *Command) Reset() { + *x = Command{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Command) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Command) ProtoMessage() {} + +func (x *Command) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Command.ProtoReflect.Descriptor instead. +func (*Command) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{2} +} + +func (m *Command) GetCommand() isCommand_Command { + if m != nil { + return m.Command + } + return nil +} + +func (x *Command) GetIncrement() *Command_IncrementState { + if x, ok := x.GetCommand().(*Command_Increment); ok { + return x.Increment + } + return nil +} + +func (x *Command) GetSend() *Command_Send { + if x, ok := x.GetCommand().(*Command_Send_); ok { + return x.Send + } + return nil +} + +func (x *Command) GetSendAfter() *Command_SendAfter { + if x, ok := x.GetCommand().(*Command_SendAfter_); ok { + return x.SendAfter + } + return nil +} + +func (x *Command) GetSendEgress() *Command_SendEgress { + if x, ok := x.GetCommand().(*Command_SendEgress_); ok { + return x.SendEgress + } + return nil +} + +func (x *Command) GetAsyncOperation() *Command_AsyncOperation { + if x, ok := x.GetCommand().(*Command_AsyncOperation_); ok { + return x.AsyncOperation + } + return nil +} + +func (x *Command) GetVerify() *Command_Verify { + if x, ok := x.GetCommand().(*Command_Verify_); ok { + return x.Verify + } + return nil +} + +type isCommand_Command interface { + isCommand_Command() +} + +type Command_Increment struct { + Increment *Command_IncrementState `protobuf:"bytes,1,opt,name=increment,proto3,oneof"` +} + +type Command_Send_ struct { + Send *Command_Send `protobuf:"bytes,2,opt,name=send,proto3,oneof"` +} + +type Command_SendAfter_ struct { + SendAfter *Command_SendAfter `protobuf:"bytes,3,opt,name=send_after,json=sendAfter,proto3,oneof"` +} + +type Command_SendEgress_ struct { + SendEgress *Command_SendEgress `protobuf:"bytes,4,opt,name=send_egress,json=sendEgress,proto3,oneof"` +} + +type Command_AsyncOperation_ struct { + AsyncOperation *Command_AsyncOperation `protobuf:"bytes,5,opt,name=async_operation,json=asyncOperation,proto3,oneof"` +} + +type Command_Verify_ struct { + Verify *Command_Verify `protobuf:"bytes,6,opt,name=verify,proto3,oneof"` +} + +func (*Command_Increment) isCommand_Command() {} + +func (*Command_Send_) isCommand_Command() {} + +func (*Command_SendAfter_) isCommand_Command() {} + +func (*Command_SendEgress_) isCommand_Command() {} + +func (*Command_AsyncOperation_) isCommand_Command() {} + +func (*Command_Verify_) isCommand_Command() {} + +type VerificationResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Expected int64 `protobuf:"varint,2,opt,name=expected,proto3" json:"expected,omitempty"` + Actual int64 `protobuf:"varint,3,opt,name=actual,proto3" json:"actual,omitempty"` +} + +func (x *VerificationResult) Reset() { + *x = VerificationResult{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *VerificationResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VerificationResult) ProtoMessage() {} + +func (x *VerificationResult) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VerificationResult.ProtoReflect.Descriptor instead. +func (*VerificationResult) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{3} +} + +func (x *VerificationResult) GetId() int32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *VerificationResult) GetExpected() int64 { + if x != nil { + return x.Expected + } + return 0 +} + +func (x *VerificationResult) GetActual() int64 { + if x != nil { + return x.Actual + } + return 0 +} + +type Command_IncrementState struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Command_IncrementState) Reset() { + *x = Command_IncrementState{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Command_IncrementState) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Command_IncrementState) ProtoMessage() {} + +func (x *Command_IncrementState) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Command_IncrementState.ProtoReflect.Descriptor instead. +func (*Command_IncrementState) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{2, 0} +} + +type Command_Send struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Target int32 `protobuf:"varint,1,opt,name=target,proto3" json:"target,omitempty"` + Commands *Commands `protobuf:"bytes,2,opt,name=commands,proto3" json:"commands,omitempty"` +} + +func (x *Command_Send) Reset() { + *x = Command_Send{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Command_Send) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Command_Send) ProtoMessage() {} + +func (x *Command_Send) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Command_Send.ProtoReflect.Descriptor instead. +func (*Command_Send) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{2, 1} +} + +func (x *Command_Send) GetTarget() int32 { + if x != nil { + return x.Target + } + return 0 +} + +func (x *Command_Send) GetCommands() *Commands { + if x != nil { + return x.Commands + } + return nil +} + +type Command_SendAfter struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Target int32 `protobuf:"varint,1,opt,name=target,proto3" json:"target,omitempty"` + Commands *Commands `protobuf:"bytes,2,opt,name=commands,proto3" json:"commands,omitempty"` +} + +func (x *Command_SendAfter) Reset() { + *x = Command_SendAfter{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Command_SendAfter) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Command_SendAfter) ProtoMessage() {} + +func (x *Command_SendAfter) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Command_SendAfter.ProtoReflect.Descriptor instead. +func (*Command_SendAfter) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{2, 2} +} + +func (x *Command_SendAfter) GetTarget() int32 { + if x != nil { + return x.Target + } + return 0 +} + +func (x *Command_SendAfter) GetCommands() *Commands { + if x != nil { + return x.Commands + } + return nil +} + +type Command_SendEgress struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Command_SendEgress) Reset() { + *x = Command_SendEgress{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Command_SendEgress) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Command_SendEgress) ProtoMessage() {} + +func (x *Command_SendEgress) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Command_SendEgress.ProtoReflect.Descriptor instead. +func (*Command_SendEgress) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{2, 3} +} + +type Command_AsyncOperation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Failure bool `protobuf:"varint,1,opt,name=failure,proto3" json:"failure,omitempty"` + ResolvedCommands *Commands `protobuf:"bytes,2,opt,name=resolved_commands,json=resolvedCommands,proto3" json:"resolved_commands,omitempty"` +} + +func (x *Command_AsyncOperation) Reset() { + *x = Command_AsyncOperation{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Command_AsyncOperation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Command_AsyncOperation) ProtoMessage() {} + +func (x *Command_AsyncOperation) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Command_AsyncOperation.ProtoReflect.Descriptor instead. +func (*Command_AsyncOperation) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{2, 4} +} + +func (x *Command_AsyncOperation) GetFailure() bool { + if x != nil { + return x.Failure + } + return false +} + +func (x *Command_AsyncOperation) GetResolvedCommands() *Commands { + if x != nil { + return x.ResolvedCommands + } + return nil +} + +type Command_Verify struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Expected int64 `protobuf:"varint,1,opt,name=expected,proto3" json:"expected,omitempty"` +} + +func (x *Command_Verify) Reset() { + *x = Command_Verify{} + if protoimpl.UnsafeEnabled { + mi := &file_commands_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Command_Verify) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Command_Verify) ProtoMessage() {} + +func (x *Command_Verify) ProtoReflect() protoreflect.Message { + mi := &file_commands_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Command_Verify.ProtoReflect.Descriptor instead. +func (*Command_Verify) Descriptor() ([]byte, []int) { + return file_commands_proto_rawDescGZIP(), []int{2, 5} +} + +func (x *Command_Verify) GetExpected() int64 { + if x != nil { + return x.Expected + } + return 0 +} + +var File_commands_proto protoreflect.FileDescriptor + +var file_commands_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x23, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, + 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, + 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x22, 0x72, 0x0a, 0x0d, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x43, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x49, + 0x0a, 0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x2d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, + 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, + 0x2e, 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x52, + 0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x22, 0x52, 0x0a, 0x08, 0x43, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x12, 0x46, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, + 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x22, 0xd0, 0x07, + 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x5b, 0x0a, 0x09, 0x69, 0x6e, 0x63, + 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6f, + 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, + 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, 0x6f, + 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x63, 0x72, 0x65, + 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x48, 0x00, 0x52, 0x09, 0x69, 0x6e, 0x63, + 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x47, 0x0a, 0x04, 0x73, 0x65, 0x6e, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, + 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, + 0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x48, 0x00, 0x52, 0x04, 0x73, 0x65, 0x6e, 0x64, 0x12, + 0x57, 0x0a, 0x0a, 0x73, 0x65, 0x6e, 0x64, 0x5f, 0x61, 0x66, 0x74, 0x65, 0x72, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x36, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, + 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, + 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, 0x48, 0x00, 0x52, 0x09, 0x73, + 0x65, 0x6e, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, 0x12, 0x5a, 0x0a, 0x0b, 0x73, 0x65, 0x6e, 0x64, + 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x37, 0x2e, + 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, + 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, + 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x53, 0x65, 0x6e, 0x64, + 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48, 0x00, 0x52, 0x0a, 0x73, 0x65, 0x6e, 0x64, 0x45, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x12, 0x66, 0x0a, 0x0f, 0x61, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x6f, 0x70, + 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, + 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, + 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, + 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x41, 0x73, 0x79, 0x6e, + 0x63, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x0e, 0x61, 0x73, + 0x79, 0x6e, 0x63, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x4d, 0x0a, 0x06, + 0x76, 0x65, 0x72, 0x69, 0x66, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x6f, + 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, + 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, 0x6f, + 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x56, 0x65, 0x72, 0x69, 0x66, + 0x79, 0x48, 0x00, 0x52, 0x06, 0x76, 0x65, 0x72, 0x69, 0x66, 0x79, 0x1a, 0x10, 0x0a, 0x0e, 0x49, + 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x1a, 0x69, 0x0a, + 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x49, 0x0a, + 0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x2d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, + 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, + 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x52, 0x08, + 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x1a, 0x6e, 0x0a, 0x09, 0x53, 0x65, 0x6e, 0x64, + 0x41, 0x66, 0x74, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x49, 0x0a, + 0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x2d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, + 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, + 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x52, 0x08, + 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x1a, 0x0c, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, + 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x1a, 0x86, 0x01, 0x0a, 0x0e, 0x41, 0x73, 0x79, 0x6e, 0x63, + 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x66, 0x61, 0x69, + 0x6c, 0x75, 0x72, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x66, 0x61, 0x69, 0x6c, + 0x75, 0x72, 0x65, 0x12, 0x5a, 0x0a, 0x11, 0x72, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x5f, + 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2d, + 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, + 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, + 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x52, 0x10, 0x72, + 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x1a, + 0x24, 0x0a, 0x06, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x78, 0x70, + 0x65, 0x63, 0x74, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x65, 0x78, 0x70, + 0x65, 0x63, 0x74, 0x65, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, + 0x22, 0x58, 0x0a, 0x12, 0x56, 0x65, 0x72, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x78, 0x70, 0x65, 0x63, 0x74, + 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x65, 0x78, 0x70, 0x65, 0x63, 0x74, + 0x65, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x75, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x06, 0x61, 0x63, 0x74, 0x75, 0x61, 0x6c, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x3b, + 0x6d, 0x61, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_commands_proto_rawDescOnce sync.Once + file_commands_proto_rawDescData = file_commands_proto_rawDesc +) + +func file_commands_proto_rawDescGZIP() []byte { + file_commands_proto_rawDescOnce.Do(func() { + file_commands_proto_rawDescData = protoimpl.X.CompressGZIP(file_commands_proto_rawDescData) + }) + return file_commands_proto_rawDescData +} + +var file_commands_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_commands_proto_goTypes = []interface{}{ + (*SourceCommand)(nil), // 0: org.apache.flink.statefun.e2e.smoke.SourceCommand + (*Commands)(nil), // 1: org.apache.flink.statefun.e2e.smoke.Commands + (*Command)(nil), // 2: org.apache.flink.statefun.e2e.smoke.Command + (*VerificationResult)(nil), // 3: org.apache.flink.statefun.e2e.smoke.VerificationResult + (*Command_IncrementState)(nil), // 4: org.apache.flink.statefun.e2e.smoke.Command.IncrementState + (*Command_Send)(nil), // 5: org.apache.flink.statefun.e2e.smoke.Command.Send + (*Command_SendAfter)(nil), // 6: org.apache.flink.statefun.e2e.smoke.Command.SendAfter + (*Command_SendEgress)(nil), // 7: org.apache.flink.statefun.e2e.smoke.Command.SendEgress + (*Command_AsyncOperation)(nil), // 8: org.apache.flink.statefun.e2e.smoke.Command.AsyncOperation + (*Command_Verify)(nil), // 9: org.apache.flink.statefun.e2e.smoke.Command.Verify +} +var file_commands_proto_depIdxs = []int32{ + 1, // 0: org.apache.flink.statefun.e2e.smoke.SourceCommand.commands:type_name -> org.apache.flink.statefun.e2e.smoke.Commands + 2, // 1: org.apache.flink.statefun.e2e.smoke.Commands.command:type_name -> org.apache.flink.statefun.e2e.smoke.Command + 4, // 2: org.apache.flink.statefun.e2e.smoke.Command.increment:type_name -> org.apache.flink.statefun.e2e.smoke.Command.IncrementState + 5, // 3: org.apache.flink.statefun.e2e.smoke.Command.send:type_name -> org.apache.flink.statefun.e2e.smoke.Command.Send + 6, // 4: org.apache.flink.statefun.e2e.smoke.Command.send_after:type_name -> org.apache.flink.statefun.e2e.smoke.Command.SendAfter + 7, // 5: org.apache.flink.statefun.e2e.smoke.Command.send_egress:type_name -> org.apache.flink.statefun.e2e.smoke.Command.SendEgress + 8, // 6: org.apache.flink.statefun.e2e.smoke.Command.async_operation:type_name -> org.apache.flink.statefun.e2e.smoke.Command.AsyncOperation + 9, // 7: org.apache.flink.statefun.e2e.smoke.Command.verify:type_name -> org.apache.flink.statefun.e2e.smoke.Command.Verify + 1, // 8: org.apache.flink.statefun.e2e.smoke.Command.Send.commands:type_name -> org.apache.flink.statefun.e2e.smoke.Commands + 1, // 9: org.apache.flink.statefun.e2e.smoke.Command.SendAfter.commands:type_name -> org.apache.flink.statefun.e2e.smoke.Commands + 1, // 10: org.apache.flink.statefun.e2e.smoke.Command.AsyncOperation.resolved_commands:type_name -> org.apache.flink.statefun.e2e.smoke.Commands + 11, // [11:11] is the sub-list for method output_type + 11, // [11:11] is the sub-list for method input_type + 11, // [11:11] is the sub-list for extension type_name + 11, // [11:11] is the sub-list for extension extendee + 0, // [0:11] is the sub-list for field type_name +} + +func init() { file_commands_proto_init() } +func file_commands_proto_init() { + if File_commands_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_commands_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SourceCommand); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Commands); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Command); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*VerificationResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Command_IncrementState); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Command_Send); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Command_SendAfter); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Command_SendEgress); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Command_AsyncOperation); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_commands_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Command_Verify); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_commands_proto_msgTypes[2].OneofWrappers = []interface{}{ + (*Command_Increment)(nil), + (*Command_Send_)(nil), + (*Command_SendAfter_)(nil), + (*Command_SendEgress_)(nil), + (*Command_AsyncOperation_)(nil), + (*Command_Verify_)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_commands_proto_rawDesc, + NumEnums: 0, + NumMessages: 10, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_commands_proto_goTypes, + DependencyIndexes: file_commands_proto_depIdxs, + MessageInfos: file_commands_proto_msgTypes, + }.Build() + File_commands_proto = out.File + file_commands_proto_rawDesc = nil + file_commands_proto_goTypes = nil + file_commands_proto_depIdxs = nil +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/protobuf/commands.proto b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/protobuf/commands.proto new file mode 100644 index 0000000..bd24477 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/protobuf/commands.proto @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.flink.statefun.e2e.smoke; +option java_package = "org.apache.flink.statefun.e2e.smoke.generated"; +option java_multiple_files = true; +option go_package = ".;app"; + +message SourceCommand { + int32 target = 1; + Commands commands = 2; +} + +message Commands { + repeated Command command = 1; +} + +message Command { + message IncrementState { + } + message Send { + int32 target = 1; + Commands commands = 2; + } + message SendAfter { + int32 target = 1; + Commands commands = 2; + } + message SendEgress { + } + message AsyncOperation { + bool failure = 1; + Commands resolved_commands = 2; + } + message Verify { + int64 expected = 1; + } + + oneof command { + IncrementState increment = 1; + Send send = 2; + SendAfter send_after = 3; + SendEgress send_egress = 4; + AsyncOperation async_operation = 5; + Verify verify = 6; + } +} + +message VerificationResult { + int32 id = 1; + int64 expected = 2; + int64 actual = 3; +} + diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java new file mode 100644 index 0000000..bb0211b --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke.golang; + +import java.nio.file.Path; +import java.nio.file.Paths; +import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers; +import org.apache.flink.statefun.e2e.smoke.SmokeRunner; +import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.builder.ImageFromDockerfile; + +public class SmokeVerificationGolangE2E { + + private static final Logger LOG = LoggerFactory.getLogger(SmokeVerificationGolangE2E.class); + private static final int NUM_WORKERS = 2; + + @Test(timeout = 1_000 * 60 * 10) + public void runWith() throws Throwable { + SmokeRunnerParameters parameters = new SmokeRunnerParameters(); + parameters.setNumberOfFunctionInstances(128); + parameters.setMessageCount(100_000); + parameters.setMaxFailures(1); + + GenericContainer<?> remoteFunction = configureRemoteFunction(); + + StatefulFunctionsAppContainers.Builder builder = + StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS) + .withBuildContextFileFromClasspath("remote-module", "/remote-module/") + .dependsOn(remoteFunction); + + SmokeRunner.run(parameters, builder); + } + + private GenericContainer<?> configureRemoteFunction() { + ImageFromDockerfile remoteFunctionImage = + new ImageFromDockerfile("remote-function-image") + .withFileFromClasspath("Dockerfile", "Dockerfile.remote-function") + .withFileFromPath("source/", goSdkPath()) + .withFileFromPath("smoketest/", remoteFunctionGoSourcePath()); + + return new GenericContainer<>(remoteFunctionImage) + .withNetworkAliases("remote-function-host") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + private static Path goSdkPath() { + return Paths.get(System.getProperty("user.dir") + "/../../statefun-sdk-go"); + } + + private static Path remoteFunctionGoSourcePath() { + return Paths.get(System.getProperty("user.dir") + "/src/main/go"); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile new file mode 100644 index 0000000..01b8e6f --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM flink-statefun:3.1-SNAPSHOT + +RUN mkdir -p /opt/statefun/modules/statefun-smoke-e2e +COPY statefun-smoke-e2e-driver.jar /opt/statefun/modules/statefun-smoke-e2e/ +COPY remote-module/ /opt/statefun/modules/statefun-smoke-e2e/ +COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile.remote-function b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile.remote-function new file mode 100644 index 0000000..41f1cf7 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile.remote-function @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.16-alpine +RUN apk add --no-cache git + +RUN mkdir -p /app/test/smoketest +WORKDIR /app + +ADD source/ /app/ +ADD smoketest/ /app/test/smoketest/ +RUN ls && cd test/smoketest && go build + +EXPOSE 8000 +CMD ./test/smoketest/smoketest diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/log4j.properties new file mode 100644 index 0000000..fb965d3 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml new file mode 100644 index 0000000..1f9874d --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: io.statefun.endpoints.v2/http +spec: + functions: statefun.smoke.e2e/command-interpreter-fn + urlPathTemplate: http://remote-function-host:8000 + maxNumBatchRequests: 10000