[ 
https://issues.apache.org/jira/browse/FLINK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22584:
-----------------------------------
    Labels: developer-experience stale-major  (was: developer-experience)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Use protobuf-shaded in StateFun core.
> -------------------------------------
>
>                 Key: FLINK-22584
>                 URL: https://issues.apache.org/jira/browse/FLINK-22584
>             Project: Flink
>          Issue Type: Improvement
>          Components: Stateful Functions
>            Reporter: Igal Shilman
>            Priority: Major
>              Labels: developer-experience, stale-major
>             Fix For: statefun-3.1.0
>
>
> We have *statefun-protobuf-shaded* module, that was introduced for the remote 
> Java sdk.
> we can use it to shade protobuf internally, to reduce the dependency surface.
> The major hurdle we need to overcome is that, in embedded functions, we have 
> to be able to accept instances of protobuf generated messages by the user.
> For example:
> {code:java}
> UserProfile userProfile = UserProfile.newBilder().build();
> context.send(..., userProfile) {code}
> If we will simply use the shaded Protobuf version, we will get immediately a 
> class cast exception.
> One way to overcome this is to use reflection and find the well known methods 
> on the generated classes and call toBytes() / parseFrom() reflectively.
> This however will cause a significant slow down, even by using MethodHandles.
> A small experiment that I've previously done with ByteBuddy mitigates this, 
> by generating 
> accessors, in pre-flight:
> {code:java}
> package org.apache.flink.statefun.flink.common.protobuf.serde;
> import static net.bytebuddy.matcher.ElementMatchers.named;import 
> java.io.InputStream;
> import java.io.OutputStream;
> import java.lang.reflect.InvocationTargetException;
> import java.lang.reflect.Method;
> import net.bytebuddy.ByteBuddy;
> import net.bytebuddy.dynamic.DynamicType;
> import net.bytebuddy.implementation.FixedValue;
> import net.bytebuddy.implementation.MethodCall;
> import net.bytebuddy.implementation.bytecode.assign.Assigner;final class 
> ReflectiveProtobufSerde {  @SuppressWarnings({"unchecked", "rawtypes"})
>   static <M> ProtobufSerde<M> ofProtobufGeneratedType(Class<M> type) {
>     try {
>       DynamicType.Unloaded<ProtobufSerde> unloaded = 
> configureByteBuddy(type);      Class<? extends ProtobufSerde> writer = 
> unloaded.load(type.getClassLoader()).getLoaded();      return 
> (ProtobufSerde<M>) writer.getDeclaredConstructor().newInstance();
>     } catch (Throwable e) {
>       throw new IllegalArgumentException();
>     }
>   }  @SuppressWarnings("rawtypes")
>   private static DynamicType.Unloaded<ProtobufSerde> 
> configureByteBuddy(Class<?> type)
>       throws NoSuchMethodException, InvocationTargetException, 
> IllegalAccessException {
>     Method writeToMethod = type.getMethod("writeTo", OutputStream.class);
>     Method parseFromMethod = type.getMethod("parseFrom", InputStream.class);
>     Method getSerializedSizeMethod = type.getMethod("getSerializedSize");    
> // get the message full name
>     Method getDescriptorMethod = type.getMethod("getDescriptor");
>     Object descriptor = getDescriptorMethod.invoke(null);
>     Method getFullNameMethod = descriptor.getClass().getMethod("getFullName");
>     String messageFullName = (String) getFullNameMethod.invoke(descriptor);   
>  return new ByteBuddy()
>         .subclass(ProtobufSerde.class)
>         .typeVariable("M", type)
>         .method(named("writeTo"))
>         .intercept(
>             MethodCall.invoke(writeToMethod)
>                 .onArgument(0)
>                 .withArgument(1)
>                 .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
>         .method(named("parseFrom"))
>         .intercept(MethodCall.invoke(parseFromMethod).withArgument(0))
>         .method(named("getSerializedSize"))
>         .intercept(
>             MethodCall.invoke(getSerializedSizeMethod)
>                 .onArgument(0)
>                 .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC))
>         .method(named("getMessageFullName"))
>         .intercept(FixedValue.value(messageFullName))
>         .make();
>   }
> }
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to