This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
     new b5cd78d  [FLINK-21721] Add EgressMessageBuilder
b5cd78d is described below

commit b5cd78d05abb3aecb30fabc5f765ffa2b63db033
Author: Igal Shilman <[email protected]>
AuthorDate: Wed Mar 10 17:03:28 2021 +0100

    [FLINK-21721] Add EgressMessageBuilder
    
    This closes #212.
---
 .../sdk/java/message/EgressMessageBuilder.java     | 92 ++++++++++++++++++++++
 1 file changed, 92 insertions(+)

diff --git 
a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageBuilder.java
 
b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageBuilder.java
new file mode 100644
index 0000000..cb42a1c
--- /dev/null
+++ 
b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString;
+
+/**
+ * A Custom {@link EgressMessage} builder.
+ *
+ * <p>To use {code Kafka} specific builder please use {@link
+ * org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage}. To use {code 
Kinesis} specific egress
+ * please use {@link 
org.apache.flink.statefun.sdk.egress.generated.KinesisEgress}.
+ *
+ * <p>Use this builder if you need to send message to a custom egress defined 
via the embedded SDK.
+ */
+public final class EgressMessageBuilder {
+  private final TypeName target;
+  private final TypedValue.Builder builder;
+
+  public static EgressMessageBuilder forEgress(TypeName targetEgress) {
+    return new EgressMessageBuilder(targetEgress);
+  }
+
+  private EgressMessageBuilder(TypeName target) {
+    this.target = Objects.requireNonNull(target);
+    this.builder = TypedValue.newBuilder();
+  }
+
+  public EgressMessageBuilder withValue(long value) {
+    return withCustomType(Types.longType(), value);
+  }
+
+  public EgressMessageBuilder withValue(int value) {
+    return withCustomType(Types.integerType(), value);
+  }
+
+  public EgressMessageBuilder withValue(boolean value) {
+    return withCustomType(Types.booleanType(), value);
+  }
+
+  public EgressMessageBuilder withValue(String value) {
+    return withCustomType(Types.stringType(), value);
+  }
+
+  public EgressMessageBuilder withValue(float value) {
+    return withCustomType(Types.floatType(), value);
+  }
+
+  public EgressMessageBuilder withValue(double value) {
+    return withCustomType(Types.doubleType(), value);
+  }
+
+  public <T> EgressMessageBuilder withCustomType(Type<T> customType, T 
element) {
+    Objects.requireNonNull(customType);
+    Objects.requireNonNull(element);
+    TypeSerializer<T> typeSerializer = customType.typeSerializer();
+    
builder.setTypenameBytes(ApiExtension.typeNameByteString(customType.typeName()));
+    Slice serialized = typeSerializer.serialize(element);
+    ByteString serializedByteString = 
SliceProtobufUtil.asByteString(serialized);
+    builder.setValue(serializedByteString);
+    builder.setHasValue(true);
+    return this;
+  }
+
+  public EgressMessage build() {
+    return new EgressMessageWrapper(target, builder.build());
+  }
+}

Reply via email to