This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit a7dfca8bde7ae6bdd6d6e0d15df9eec435b88893
Author: sjwiesman <sjwies...@gmail.com>
AuthorDate: Wed Aug 11 09:18:19 2021 -0500

    [FLINK-18810][sdk] Golang remote functions smoke test
---
 statefun-e2e-tests/pom.xml                         |   1 +
 .../statefun-smoke-e2e-golang/pom.xml              |  30 +
 .../src/main/go/CommandInterpreterFn.go            | 133 +++
 .../src/main/go/SmokeE2EMain.go                    |  50 ++
 .../src/main/go/commands.pb.go                     | 915 +++++++++++++++++++++
 .../src/main/protobuf/commands.proto               |  71 ++
 .../smoke/golang/SmokeVerificationGolangE2E.java   |  74 ++
 .../src/test/resources/Dockerfile                  |  21 +
 .../src/test/resources/Dockerfile.remote-function  |  28 +
 .../src/test/resources/log4j.properties            |  24 +
 .../src/test/resources/remote-module/module.yaml   |  20 +
 11 files changed, 1367 insertions(+)

diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index d00e6df..9e098be 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -40,6 +40,7 @@ under the License.
         <module>statefun-smoke-e2e-multilang-base</module>
         <module>statefun-smoke-e2e-multilang-harness</module>
         <module>statefun-smoke-e2e-java</module>
+        <module>statefun-smoke-e2e-golang</module>
     </modules>
 
     <build>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/pom.xml 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/pom.xml
new file mode 100644
index 0000000..049e916
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/pom.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>statefun-smoke-e2e-multilang-base</artifactId>
+        <version>3.1-SNAPSHOT</version>
+        
<relativePath>../statefun-smoke-e2e-multilang-base/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-smoke-e2e-golang</artifactId>
+</project>
\ No newline at end of file
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/CommandInterpreterFn.go
 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/CommandInterpreterFn.go
new file mode 100644
index 0000000..3d149f0
--- /dev/null
+++ 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/CommandInterpreterFn.go
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "fmt"
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "strconv"
+       "time"
+)
+
+var (
+       State = statefun.ValueSpec{
+               Name:      "state",
+               ValueType: statefun.Int64Type,
+       }
+)
+
+func CommandInterpreterFn(ctx statefun.Context, message statefun.Message) 
error {
+       if message.Is(sourceCommandsType) {
+               cmds := &SourceCommand{}
+               if err := message.As(sourceCommandsType, cmds); err != nil {
+                       return fmt.Errorf("failed to deserialize source 
commonds: %w", err)
+               }
+               return interpret(ctx, cmds.GetCommands())
+       } else if message.Is(commandsType) {
+               cmds := &Commands{}
+               if err := message.As(commandsType, cmds); err != nil {
+                       return fmt.Errorf("failed to deserialize commonds: %w", 
err)
+               }
+               return interpret(ctx, cmds)
+       }
+
+       return fmt.Errorf("unrecognized message type %v", 
message.ValueTypeName())
+}
+
+func interpret(ctx statefun.Context, cmds *Commands) error {
+       for _, cmd := range cmds.GetCommand() {
+               if cmd.GetIncrement() != nil {
+                       modifyState(ctx)
+               } else if cmd.GetSend() != nil {
+                       send(ctx, cmd.GetSend())
+               } else if cmd.GetSendAfter() != nil {
+                       sendAfter(ctx, cmd.GetSendAfter())
+               } else if cmd.GetSendEgress() != nil {
+                       sendEgress(ctx)
+               } else if cmd.GetVerify() != nil {
+                       verify(ctx, cmd.GetVerify())
+               }
+       }
+
+       return nil
+}
+
+func modifyState(ctx statefun.Context) {
+       var value int64
+       ctx.Storage().Get(State, &value)
+       value += 1
+       ctx.Storage().Set(State, value)
+}
+
+func send(ctx statefun.Context, cmd *Command_Send) {
+       target := statefun.Address{
+               FunctionType: commandFn,
+               Id:           fmt.Sprint(cmd.GetTarget()),
+       }
+
+       message := statefun.MessageBuilder{
+               Target:    target,
+               Value:     cmd.GetCommands(),
+               ValueType: commandsType,
+       }
+
+       ctx.Send(message)
+}
+
+func sendAfter(ctx statefun.Context, cmd *Command_SendAfter) {
+       target := statefun.Address{
+               FunctionType: commandFn,
+               Id:           fmt.Sprint(cmd.GetTarget()),
+       }
+
+       message := statefun.MessageBuilder{
+               Target:    target,
+               Value:     cmd.GetCommands(),
+               ValueType: commandsType,
+       }
+
+       ctx.SendAfter(time.Duration(1), message)
+}
+
+func sendEgress(ctx statefun.Context) {
+       message := statefun.GenericEgressBuilder{
+               Target:    discardEgress,
+               Value:     "discarded-message",
+               ValueType: statefun.StringType,
+       }
+
+       ctx.SendEgress(message)
+}
+
+func verify(ctx statefun.Context, verify *Command_Verify) {
+       id, _ := strconv.Atoi(ctx.Self().Id)
+
+       var actual int64
+       ctx.Storage().Get(State, &actual)
+
+       message := statefun.GenericEgressBuilder{
+               Target: verificationEgress,
+               Value: &VerificationResult{
+                       Id:       int32(id),
+                       Expected: verify.GetExpected(),
+                       Actual:   actual,
+               },
+               ValueType: verificationType,
+       }
+
+       ctx.SendEgress(message)
+}
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/SmokeE2EMain.go 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/SmokeE2EMain.go
new file mode 100644
index 0000000..c9fb811
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/SmokeE2EMain.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
+       "log"
+       "net/http"
+)
+
+var (
+       appNamespace              = "statefun.smoke.e2e"
+       commandFn, _              = statefun.TypeNameFromParts(appNamespace, 
"command-interpreter-fn")
+       discardEgress, _          = statefun.TypeNameFromParts(appNamespace, 
"discard-sink")
+       verificationEgress, _     = statefun.TypeNameFromParts(appNamespace, 
"verification-sink")
+       commandsTypeName, _       = statefun.TypeNameFromParts(appNamespace, 
"commands")
+       sourceCommandsTypeName, _ = statefun.TypeNameFromParts(appNamespace, 
"source-command")
+       verificationTypeName, _   = statefun.TypeNameFromParts(appNamespace, 
"verification-result")
+       commandsType              = 
statefun.MakeProtobufTypeWithTypeName(commandsTypeName)
+       sourceCommandsType        = 
statefun.MakeProtobufTypeWithTypeName(sourceCommandsTypeName)
+       verificationType          = 
statefun.MakeProtobufTypeWithTypeName(verificationTypeName)
+)
+
+func main() {
+       spec := statefun.StatefulFunctionSpec{
+               FunctionType: commandFn,
+               States:       []statefun.ValueSpec{State},
+               Function:     
statefun.StatefulFunctionPointer(CommandInterpreterFn),
+       }
+
+       builder := statefun.StatefulFunctionsBuilder()
+       _ = builder.WithSpec(spec)
+
+       http.Handle("/", builder.AsHandler())
+       log.Fatal(http.ListenAndServe(":8000", nil))
+}
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/commands.pb.go 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/commands.pb.go
new file mode 100644
index 0000000..1c4c667
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/go/commands.pb.go
@@ -0,0 +1,915 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+//     protoc-gen-go v1.27.1
+//     protoc        v3.17.3
+// source: commands.proto
+
+package main
+
+import (
+       protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+       protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+       reflect "reflect"
+       sync "sync"
+)
+
+const (
+       // Verify that this generated code is sufficiently up-to-date.
+       _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+       // Verify that runtime/protoimpl is sufficiently up-to-date.
+       _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type SourceCommand struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Target   int32     `protobuf:"varint,1,opt,name=target,proto3" 
json:"target,omitempty"`
+       Commands *Commands `protobuf:"bytes,2,opt,name=commands,proto3" 
json:"commands,omitempty"`
+}
+
+func (x *SourceCommand) Reset() {
+       *x = SourceCommand{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[0]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *SourceCommand) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SourceCommand) ProtoMessage() {}
+
+func (x *SourceCommand) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[0]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use SourceCommand.ProtoReflect.Descriptor instead.
+func (*SourceCommand) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *SourceCommand) GetTarget() int32 {
+       if x != nil {
+               return x.Target
+       }
+       return 0
+}
+
+func (x *SourceCommand) GetCommands() *Commands {
+       if x != nil {
+               return x.Commands
+       }
+       return nil
+}
+
+type Commands struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Command []*Command `protobuf:"bytes,1,rep,name=command,proto3" 
json:"command,omitempty"`
+}
+
+func (x *Commands) Reset() {
+       *x = Commands{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[1]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *Commands) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Commands) ProtoMessage() {}
+
+func (x *Commands) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[1]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use Commands.ProtoReflect.Descriptor instead.
+func (*Commands) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *Commands) GetCommand() []*Command {
+       if x != nil {
+               return x.Command
+       }
+       return nil
+}
+
+type Command struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       // Types that are assignable to Command:
+       //      *Command_Increment
+       //      *Command_Send_
+       //      *Command_SendAfter_
+       //      *Command_SendEgress_
+       //      *Command_AsyncOperation_
+       //      *Command_Verify_
+       Command isCommand_Command `protobuf_oneof:"command"`
+}
+
+func (x *Command) Reset() {
+       *x = Command{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[2]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *Command) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Command) ProtoMessage() {}
+
+func (x *Command) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[2]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use Command.ProtoReflect.Descriptor instead.
+func (*Command) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{2}
+}
+
+func (m *Command) GetCommand() isCommand_Command {
+       if m != nil {
+               return m.Command
+       }
+       return nil
+}
+
+func (x *Command) GetIncrement() *Command_IncrementState {
+       if x, ok := x.GetCommand().(*Command_Increment); ok {
+               return x.Increment
+       }
+       return nil
+}
+
+func (x *Command) GetSend() *Command_Send {
+       if x, ok := x.GetCommand().(*Command_Send_); ok {
+               return x.Send
+       }
+       return nil
+}
+
+func (x *Command) GetSendAfter() *Command_SendAfter {
+       if x, ok := x.GetCommand().(*Command_SendAfter_); ok {
+               return x.SendAfter
+       }
+       return nil
+}
+
+func (x *Command) GetSendEgress() *Command_SendEgress {
+       if x, ok := x.GetCommand().(*Command_SendEgress_); ok {
+               return x.SendEgress
+       }
+       return nil
+}
+
+func (x *Command) GetAsyncOperation() *Command_AsyncOperation {
+       if x, ok := x.GetCommand().(*Command_AsyncOperation_); ok {
+               return x.AsyncOperation
+       }
+       return nil
+}
+
+func (x *Command) GetVerify() *Command_Verify {
+       if x, ok := x.GetCommand().(*Command_Verify_); ok {
+               return x.Verify
+       }
+       return nil
+}
+
+type isCommand_Command interface {
+       isCommand_Command()
+}
+
+type Command_Increment struct {
+       Increment *Command_IncrementState 
`protobuf:"bytes,1,opt,name=increment,proto3,oneof"`
+}
+
+type Command_Send_ struct {
+       Send *Command_Send `protobuf:"bytes,2,opt,name=send,proto3,oneof"`
+}
+
+type Command_SendAfter_ struct {
+       SendAfter *Command_SendAfter 
`protobuf:"bytes,3,opt,name=send_after,json=sendAfter,proto3,oneof"`
+}
+
+type Command_SendEgress_ struct {
+       SendEgress *Command_SendEgress 
`protobuf:"bytes,4,opt,name=send_egress,json=sendEgress,proto3,oneof"`
+}
+
+type Command_AsyncOperation_ struct {
+       AsyncOperation *Command_AsyncOperation 
`protobuf:"bytes,5,opt,name=async_operation,json=asyncOperation,proto3,oneof"`
+}
+
+type Command_Verify_ struct {
+       Verify *Command_Verify `protobuf:"bytes,6,opt,name=verify,proto3,oneof"`
+}
+
+func (*Command_Increment) isCommand_Command() {}
+
+func (*Command_Send_) isCommand_Command() {}
+
+func (*Command_SendAfter_) isCommand_Command() {}
+
+func (*Command_SendEgress_) isCommand_Command() {}
+
+func (*Command_AsyncOperation_) isCommand_Command() {}
+
+func (*Command_Verify_) isCommand_Command() {}
+
+type VerificationResult struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Id       int32 `protobuf:"varint,1,opt,name=id,proto3" 
json:"id,omitempty"`
+       Expected int64 `protobuf:"varint,2,opt,name=expected,proto3" 
json:"expected,omitempty"`
+       Actual   int64 `protobuf:"varint,3,opt,name=actual,proto3" 
json:"actual,omitempty"`
+}
+
+func (x *VerificationResult) Reset() {
+       *x = VerificationResult{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[3]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *VerificationResult) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*VerificationResult) ProtoMessage() {}
+
+func (x *VerificationResult) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[3]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use VerificationResult.ProtoReflect.Descriptor instead.
+func (*VerificationResult) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *VerificationResult) GetId() int32 {
+       if x != nil {
+               return x.Id
+       }
+       return 0
+}
+
+func (x *VerificationResult) GetExpected() int64 {
+       if x != nil {
+               return x.Expected
+       }
+       return 0
+}
+
+func (x *VerificationResult) GetActual() int64 {
+       if x != nil {
+               return x.Actual
+       }
+       return 0
+}
+
+type Command_IncrementState struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+}
+
+func (x *Command_IncrementState) Reset() {
+       *x = Command_IncrementState{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[4]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *Command_IncrementState) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Command_IncrementState) ProtoMessage() {}
+
+func (x *Command_IncrementState) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[4]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use Command_IncrementState.ProtoReflect.Descriptor instead.
+func (*Command_IncrementState) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{2, 0}
+}
+
+type Command_Send struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Target   int32     `protobuf:"varint,1,opt,name=target,proto3" 
json:"target,omitempty"`
+       Commands *Commands `protobuf:"bytes,2,opt,name=commands,proto3" 
json:"commands,omitempty"`
+}
+
+func (x *Command_Send) Reset() {
+       *x = Command_Send{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[5]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *Command_Send) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Command_Send) ProtoMessage() {}
+
+func (x *Command_Send) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[5]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use Command_Send.ProtoReflect.Descriptor instead.
+func (*Command_Send) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{2, 1}
+}
+
+func (x *Command_Send) GetTarget() int32 {
+       if x != nil {
+               return x.Target
+       }
+       return 0
+}
+
+func (x *Command_Send) GetCommands() *Commands {
+       if x != nil {
+               return x.Commands
+       }
+       return nil
+}
+
+type Command_SendAfter struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Target   int32     `protobuf:"varint,1,opt,name=target,proto3" 
json:"target,omitempty"`
+       Commands *Commands `protobuf:"bytes,2,opt,name=commands,proto3" 
json:"commands,omitempty"`
+}
+
+func (x *Command_SendAfter) Reset() {
+       *x = Command_SendAfter{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[6]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *Command_SendAfter) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Command_SendAfter) ProtoMessage() {}
+
+func (x *Command_SendAfter) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[6]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use Command_SendAfter.ProtoReflect.Descriptor instead.
+func (*Command_SendAfter) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{2, 2}
+}
+
+func (x *Command_SendAfter) GetTarget() int32 {
+       if x != nil {
+               return x.Target
+       }
+       return 0
+}
+
+func (x *Command_SendAfter) GetCommands() *Commands {
+       if x != nil {
+               return x.Commands
+       }
+       return nil
+}
+
+type Command_SendEgress struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+}
+
+func (x *Command_SendEgress) Reset() {
+       *x = Command_SendEgress{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[7]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *Command_SendEgress) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Command_SendEgress) ProtoMessage() {}
+
+func (x *Command_SendEgress) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[7]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use Command_SendEgress.ProtoReflect.Descriptor instead.
+func (*Command_SendEgress) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{2, 3}
+}
+
+type Command_AsyncOperation struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Failure          bool      `protobuf:"varint,1,opt,name=failure,proto3" 
json:"failure,omitempty"`
+       ResolvedCommands *Commands 
`protobuf:"bytes,2,opt,name=resolved_commands,json=resolvedCommands,proto3" 
json:"resolved_commands,omitempty"`
+}
+
+func (x *Command_AsyncOperation) Reset() {
+       *x = Command_AsyncOperation{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[8]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *Command_AsyncOperation) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Command_AsyncOperation) ProtoMessage() {}
+
+func (x *Command_AsyncOperation) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[8]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use Command_AsyncOperation.ProtoReflect.Descriptor instead.
+func (*Command_AsyncOperation) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{2, 4}
+}
+
+func (x *Command_AsyncOperation) GetFailure() bool {
+       if x != nil {
+               return x.Failure
+       }
+       return false
+}
+
+func (x *Command_AsyncOperation) GetResolvedCommands() *Commands {
+       if x != nil {
+               return x.ResolvedCommands
+       }
+       return nil
+}
+
+type Command_Verify struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Expected int64 `protobuf:"varint,1,opt,name=expected,proto3" 
json:"expected,omitempty"`
+}
+
+func (x *Command_Verify) Reset() {
+       *x = Command_Verify{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_commands_proto_msgTypes[9]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *Command_Verify) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Command_Verify) ProtoMessage() {}
+
+func (x *Command_Verify) ProtoReflect() protoreflect.Message {
+       mi := &file_commands_proto_msgTypes[9]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use Command_Verify.ProtoReflect.Descriptor instead.
+func (*Command_Verify) Descriptor() ([]byte, []int) {
+       return file_commands_proto_rawDescGZIP(), []int{2, 5}
+}
+
+func (x *Command_Verify) GetExpected() int64 {
+       if x != nil {
+               return x.Expected
+       }
+       return 0
+}
+
+var File_commands_proto protoreflect.FileDescriptor
+
+var file_commands_proto_rawDesc = []byte{
+       0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x2e, 0x70, 
0x72, 0x6f, 0x74, 0x6f,
+       0x12, 0x23, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 
0x2e, 0x66, 0x6c, 0x69,
+       0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 
0x65, 0x32, 0x65, 0x2e,
+       0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x22, 0x72, 0x0a, 0x0d, 0x53, 0x6f, 0x75, 
0x72, 0x63, 0x65, 0x43,
+       0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 
0x72, 0x67, 0x65, 0x74,
+       0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 
0x65, 0x74, 0x12, 0x49,
+       0x0a, 0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x18, 0x02, 
0x20, 0x01, 0x28, 0x0b,
+       0x32, 0x2d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 
0x65, 0x2e, 0x66, 0x6c,
+       0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 
0x2e, 0x65, 0x32, 0x65,
+       0x2e, 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 
0x6e, 0x64, 0x73, 0x52,
+       0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x22, 0x52, 0x0a, 
0x08, 0x43, 0x6f, 0x6d,
+       0x6d, 0x61, 0x6e, 0x64, 0x73, 0x12, 0x46, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 
0x6d, 0x61, 0x6e, 0x64,
+       0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x6f, 0x72, 0x67, 
0x2e, 0x61, 0x70, 0x61,
+       0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 
0x61, 0x74, 0x65, 0x66,
+       0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, 0x6f, 0x6b, 0x65, 
0x2e, 0x43, 0x6f, 0x6d,
+       0x6d, 0x61, 0x6e, 0x64, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 
0x64, 0x22, 0xd0, 0x07,
+       0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x5b, 0x0a, 
0x09, 0x69, 0x6e, 0x63,
+       0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 
0x32, 0x3b, 0x2e, 0x6f,
+       0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 
0x69, 0x6e, 0x6b, 0x2e,
+       0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 
0x2e, 0x73, 0x6d, 0x6f,
+       0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x49, 
0x6e, 0x63, 0x72, 0x65,
+       0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x48, 0x00, 0x52, 
0x09, 0x69, 0x6e, 0x63,
+       0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x47, 0x0a, 0x04, 0x73, 0x65, 
0x6e, 0x64, 0x18, 0x02,
+       0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 
0x70, 0x61, 0x63, 0x68,
+       0x65, 0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 
0x65, 0x66, 0x75, 0x6e,
+       0x2e, 0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 
0x6f, 0x6d, 0x6d, 0x61,
+       0x6e, 0x64, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x48, 0x00, 0x52, 0x04, 0x73, 
0x65, 0x6e, 0x64, 0x12,
+       0x57, 0x0a, 0x0a, 0x73, 0x65, 0x6e, 0x64, 0x5f, 0x61, 0x66, 0x74, 0x65, 
0x72, 0x18, 0x03, 0x20,
+       0x01, 0x28, 0x0b, 0x32, 0x36, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 
0x61, 0x63, 0x68, 0x65,
+       0x2e, 0x66, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 
0x66, 0x75, 0x6e, 0x2e,
+       0x65, 0x32, 0x65, 0x2e, 0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 
0x6d, 0x6d, 0x61, 0x6e,
+       0x64, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, 0x48, 
0x00, 0x52, 0x09, 0x73,
+       0x65, 0x6e, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72, 0x12, 0x5a, 0x0a, 0x0b, 
0x73, 0x65, 0x6e, 0x64,
+       0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 
0x0b, 0x32, 0x37, 0x2e,
+       0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 
0x6c, 0x69, 0x6e, 0x6b,
+       0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 
0x65, 0x2e, 0x73, 0x6d,
+       0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 
0x53, 0x65, 0x6e, 0x64,
+       0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48, 0x00, 0x52, 0x0a, 0x73, 0x65, 
0x6e, 0x64, 0x45, 0x67,
+       0x72, 0x65, 0x73, 0x73, 0x12, 0x66, 0x0a, 0x0f, 0x61, 0x73, 0x79, 0x6e, 
0x63, 0x5f, 0x6f, 0x70,
+       0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 
0x0b, 0x32, 0x3b, 0x2e,
+       0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 
0x6c, 0x69, 0x6e, 0x6b,
+       0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 
0x65, 0x2e, 0x73, 0x6d,
+       0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 
0x41, 0x73, 0x79, 0x6e,
+       0x63, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 
0x52, 0x0e, 0x61, 0x73,
+       0x79, 0x6e, 0x63, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 
0x12, 0x4d, 0x0a, 0x06,
+       0x76, 0x65, 0x72, 0x69, 0x66, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 
0x32, 0x33, 0x2e, 0x6f,
+       0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x66, 0x6c, 
0x69, 0x6e, 0x6b, 0x2e,
+       0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 0x32, 0x65, 
0x2e, 0x73, 0x6d, 0x6f,
+       0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x56, 
0x65, 0x72, 0x69, 0x66,
+       0x79, 0x48, 0x00, 0x52, 0x06, 0x76, 0x65, 0x72, 0x69, 0x66, 0x79, 0x1a, 
0x10, 0x0a, 0x0e, 0x49,
+       0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 
0x65, 0x1a, 0x69, 0x0a,
+       0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x72, 
0x67, 0x65, 0x74, 0x18,
+       0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 
0x74, 0x12, 0x49, 0x0a,
+       0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 
0x01, 0x28, 0x0b, 0x32,
+       0x2d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 
0x2e, 0x66, 0x6c, 0x69,
+       0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 
0x65, 0x32, 0x65, 0x2e,
+       0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 
0x64, 0x73, 0x52, 0x08,
+       0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x1a, 0x6e, 0x0a, 0x09, 
0x53, 0x65, 0x6e, 0x64,
+       0x41, 0x66, 0x74, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x72, 
0x67, 0x65, 0x74, 0x18,
+       0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 
0x74, 0x12, 0x49, 0x0a,
+       0x08, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 
0x01, 0x28, 0x0b, 0x32,
+       0x2d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 
0x2e, 0x66, 0x6c, 0x69,
+       0x6e, 0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 
0x65, 0x32, 0x65, 0x2e,
+       0x73, 0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 
0x64, 0x73, 0x52, 0x08,
+       0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x1a, 0x0c, 0x0a, 0x0a, 
0x53, 0x65, 0x6e, 0x64,
+       0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x1a, 0x86, 0x01, 0x0a, 0x0e, 0x41, 
0x73, 0x79, 0x6e, 0x63,
+       0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 
0x07, 0x66, 0x61, 0x69,
+       0x6c, 0x75, 0x72, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 
0x66, 0x61, 0x69, 0x6c,
+       0x75, 0x72, 0x65, 0x12, 0x5a, 0x0a, 0x11, 0x72, 0x65, 0x73, 0x6f, 0x6c, 
0x76, 0x65, 0x64, 0x5f,
+       0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 0x01, 
0x28, 0x0b, 0x32, 0x2d,
+       0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 
0x66, 0x6c, 0x69, 0x6e,
+       0x6b, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x66, 0x75, 0x6e, 0x2e, 0x65, 
0x32, 0x65, 0x2e, 0x73,
+       0x6d, 0x6f, 0x6b, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 
0x73, 0x52, 0x10, 0x72,
+       0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 
0x6e, 0x64, 0x73, 0x1a,
+       0x24, 0x0a, 0x06, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x12, 0x1a, 0x0a, 
0x08, 0x65, 0x78, 0x70,
+       0x65, 0x63, 0x74, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 
0x08, 0x65, 0x78, 0x70,
+       0x65, 0x63, 0x74, 0x65, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 
0x6d, 0x61, 0x6e, 0x64,
+       0x22, 0x58, 0x0a, 0x12, 0x56, 0x65, 0x72, 0x69, 0x66, 0x69, 0x63, 0x61, 
0x74, 0x69, 0x6f, 0x6e,
+       0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 
0x18, 0x01, 0x20, 0x01,
+       0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x78, 
0x70, 0x65, 0x63, 0x74,
+       0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x65, 0x78, 
0x70, 0x65, 0x63, 0x74,
+       0x65, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x75, 0x61, 0x6c, 
0x18, 0x03, 0x20, 0x01,
+       0x28, 0x03, 0x52, 0x06, 0x61, 0x63, 0x74, 0x75, 0x61, 0x6c, 0x42, 0x08, 
0x5a, 0x06, 0x2e, 0x3b,
+       0x6d, 0x61, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+       file_commands_proto_rawDescOnce sync.Once
+       file_commands_proto_rawDescData = file_commands_proto_rawDesc
+)
+
+func file_commands_proto_rawDescGZIP() []byte {
+       file_commands_proto_rawDescOnce.Do(func() {
+               file_commands_proto_rawDescData = 
protoimpl.X.CompressGZIP(file_commands_proto_rawDescData)
+       })
+       return file_commands_proto_rawDescData
+}
+
+var file_commands_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
+var file_commands_proto_goTypes = []interface{}{
+       (*SourceCommand)(nil),          // 0: 
org.apache.flink.statefun.e2e.smoke.SourceCommand
+       (*Commands)(nil),               // 1: 
org.apache.flink.statefun.e2e.smoke.Commands
+       (*Command)(nil),                // 2: 
org.apache.flink.statefun.e2e.smoke.Command
+       (*VerificationResult)(nil),     // 3: 
org.apache.flink.statefun.e2e.smoke.VerificationResult
+       (*Command_IncrementState)(nil), // 4: 
org.apache.flink.statefun.e2e.smoke.Command.IncrementState
+       (*Command_Send)(nil),           // 5: 
org.apache.flink.statefun.e2e.smoke.Command.Send
+       (*Command_SendAfter)(nil),      // 6: 
org.apache.flink.statefun.e2e.smoke.Command.SendAfter
+       (*Command_SendEgress)(nil),     // 7: 
org.apache.flink.statefun.e2e.smoke.Command.SendEgress
+       (*Command_AsyncOperation)(nil), // 8: 
org.apache.flink.statefun.e2e.smoke.Command.AsyncOperation
+       (*Command_Verify)(nil),         // 9: 
org.apache.flink.statefun.e2e.smoke.Command.Verify
+}
+var file_commands_proto_depIdxs = []int32{
+       1,  // 0: 
org.apache.flink.statefun.e2e.smoke.SourceCommand.commands:type_name -> 
org.apache.flink.statefun.e2e.smoke.Commands
+       2,  // 1: 
org.apache.flink.statefun.e2e.smoke.Commands.command:type_name -> 
org.apache.flink.statefun.e2e.smoke.Command
+       4,  // 2: 
org.apache.flink.statefun.e2e.smoke.Command.increment:type_name -> 
org.apache.flink.statefun.e2e.smoke.Command.IncrementState
+       5,  // 3: org.apache.flink.statefun.e2e.smoke.Command.send:type_name -> 
org.apache.flink.statefun.e2e.smoke.Command.Send
+       6,  // 4: 
org.apache.flink.statefun.e2e.smoke.Command.send_after:type_name -> 
org.apache.flink.statefun.e2e.smoke.Command.SendAfter
+       7,  // 5: 
org.apache.flink.statefun.e2e.smoke.Command.send_egress:type_name -> 
org.apache.flink.statefun.e2e.smoke.Command.SendEgress
+       8,  // 6: 
org.apache.flink.statefun.e2e.smoke.Command.async_operation:type_name -> 
org.apache.flink.statefun.e2e.smoke.Command.AsyncOperation
+       9,  // 7: org.apache.flink.statefun.e2e.smoke.Command.verify:type_name 
-> org.apache.flink.statefun.e2e.smoke.Command.Verify
+       1,  // 8: 
org.apache.flink.statefun.e2e.smoke.Command.Send.commands:type_name -> 
org.apache.flink.statefun.e2e.smoke.Commands
+       1,  // 9: 
org.apache.flink.statefun.e2e.smoke.Command.SendAfter.commands:type_name -> 
org.apache.flink.statefun.e2e.smoke.Commands
+       1,  // 10: 
org.apache.flink.statefun.e2e.smoke.Command.AsyncOperation.resolved_commands:type_name
 -> org.apache.flink.statefun.e2e.smoke.Commands
+       11, // [11:11] is the sub-list for method output_type
+       11, // [11:11] is the sub-list for method input_type
+       11, // [11:11] is the sub-list for extension type_name
+       11, // [11:11] is the sub-list for extension extendee
+       0,  // [0:11] is the sub-list for field type_name
+}
+
+func init() { file_commands_proto_init() }
+func file_commands_proto_init() {
+       if File_commands_proto != nil {
+               return
+       }
+       if !protoimpl.UnsafeEnabled {
+               file_commands_proto_msgTypes[0].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*SourceCommand); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[1].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*Commands); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[2].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*Command); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[3].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*VerificationResult); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[4].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*Command_IncrementState); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[5].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*Command_Send); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[6].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*Command_SendAfter); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[7].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*Command_SendEgress); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[8].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*Command_AsyncOperation); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_commands_proto_msgTypes[9].Exporter = func(v interface{}, 
i int) interface{} {
+                       switch v := v.(*Command_Verify); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+       }
+       file_commands_proto_msgTypes[2].OneofWrappers = []interface{}{
+               (*Command_Increment)(nil),
+               (*Command_Send_)(nil),
+               (*Command_SendAfter_)(nil),
+               (*Command_SendEgress_)(nil),
+               (*Command_AsyncOperation_)(nil),
+               (*Command_Verify_)(nil),
+       }
+       type x struct{}
+       out := protoimpl.TypeBuilder{
+               File: protoimpl.DescBuilder{
+                       GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+                       RawDescriptor: file_commands_proto_rawDesc,
+                       NumEnums:      0,
+                       NumMessages:   10,
+                       NumExtensions: 0,
+                       NumServices:   0,
+               },
+               GoTypes:           file_commands_proto_goTypes,
+               DependencyIndexes: file_commands_proto_depIdxs,
+               MessageInfos:      file_commands_proto_msgTypes,
+       }.Build()
+       File_commands_proto = out.File
+       file_commands_proto_rawDesc = nil
+       file_commands_proto_goTypes = nil
+       file_commands_proto_depIdxs = nil
+}
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/protobuf/commands.proto 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/protobuf/commands.proto
new file mode 100644
index 0000000..bd24477
--- /dev/null
+++ 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/main/protobuf/commands.proto
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.smoke;
+option java_package = "org.apache.flink.statefun.e2e.smoke.generated";
+option java_multiple_files = true;
+option go_package = ".;app";
+
+message SourceCommand {
+  int32 target = 1;
+  Commands commands = 2;
+}
+
+message Commands {
+  repeated Command command = 1;
+}
+
+message Command {
+  message IncrementState {
+  }
+  message Send {
+    int32 target = 1;
+    Commands commands = 2;
+  }
+  message SendAfter {
+    int32 target = 1;
+    Commands commands = 2;
+  }
+  message SendEgress {
+  }
+  message AsyncOperation {
+    bool failure = 1;
+    Commands resolved_commands = 2;
+  }
+  message Verify {
+    int64 expected = 1;
+  }
+
+  oneof command {
+    IncrementState increment = 1;
+    Send send = 2;
+    SendAfter send_after = 3;
+    SendEgress send_egress = 4;
+    AsyncOperation async_operation = 5;
+    Verify verify = 6;
+  }
+}
+
+message VerificationResult {
+  int32 id = 1;
+  int64 expected = 2;
+  int64 actual = 3;
+}
+
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java
 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java
new file mode 100644
index 0000000..bb0211b
--- /dev/null
+++ 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.smoke.golang;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunner;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+public class SmokeVerificationGolangE2E {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SmokeVerificationGolangE2E.class);
+  private static final int NUM_WORKERS = 2;
+
+  @Test(timeout = 1_000 * 60 * 10)
+  public void runWith() throws Throwable {
+    SmokeRunnerParameters parameters = new SmokeRunnerParameters();
+    parameters.setNumberOfFunctionInstances(128);
+    parameters.setMessageCount(100_000);
+    parameters.setMaxFailures(1);
+
+    GenericContainer<?> remoteFunction = configureRemoteFunction();
+
+    StatefulFunctionsAppContainers.Builder builder =
+        StatefulFunctionsAppContainers.builder("flink-statefun-cluster", 
NUM_WORKERS)
+            .withBuildContextFileFromClasspath("remote-module", 
"/remote-module/")
+            .dependsOn(remoteFunction);
+
+    SmokeRunner.run(parameters, builder);
+  }
+
+  private GenericContainer<?> configureRemoteFunction() {
+    ImageFromDockerfile remoteFunctionImage =
+        new ImageFromDockerfile("remote-function-image")
+            .withFileFromClasspath("Dockerfile", "Dockerfile.remote-function")
+            .withFileFromPath("source/", goSdkPath())
+            .withFileFromPath("smoketest/", remoteFunctionGoSourcePath());
+
+    return new GenericContainer<>(remoteFunctionImage)
+        .withNetworkAliases("remote-function-host")
+        .withLogConsumer(new Slf4jLogConsumer(LOG));
+  }
+
+  private static Path goSdkPath() {
+    return Paths.get(System.getProperty("user.dir") + 
"/../../statefun-sdk-go");
+  }
+
+  private static Path remoteFunctionGoSourcePath() {
+    return Paths.get(System.getProperty("user.dir") + "/src/main/go");
+  }
+}
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile
new file mode 100644
index 0000000..01b8e6f
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink-statefun:3.1-SNAPSHOT
+
+RUN mkdir -p /opt/statefun/modules/statefun-smoke-e2e
+COPY statefun-smoke-e2e-driver.jar /opt/statefun/modules/statefun-smoke-e2e/
+COPY remote-module/ /opt/statefun/modules/statefun-smoke-e2e/
+COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile.remote-function
 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile.remote-function
new file mode 100644
index 0000000..41f1cf7
--- /dev/null
+++ 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/Dockerfile.remote-function
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM golang:1.16-alpine
+RUN apk add --no-cache git
+
+RUN mkdir -p /app/test/smoketest
+WORKDIR /app
+
+ADD source/ /app/
+ADD smoketest/ /app/test/smoketest/
+RUN ls && cd test/smoketest && go build
+
+EXPOSE 8000
+CMD ./test/smoketest/smoketest
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/log4j.properties
 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb965d3
--- /dev/null
+++ 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
diff --git 
a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml
 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml
new file mode 100644
index 0000000..1f9874d
--- /dev/null
+++ 
b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kind: io.statefun.endpoints.v2/http
+spec:
+  functions: statefun.smoke.e2e/command-interpreter-fn
+  urlPathTemplate: http://remote-function-host:8000
+  maxNumBatchRequests: 10000

Reply via email to