Hi igal,

thanks for your comprehensive reply!

As for 1. I will try and create a minimal reproduction of the case and share the code with you. It might be a few days until I get around to do it.

As for 2. I will definitely give this a try. From the looks of it this seems to be the solution and this was the error in my thinking: Sending unwrapped messages to external functions...


Best regards and many thanks!

Jan

On 06.07.20 14:11, Igal Shilman wrote:
Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem like the function type is unavailable, and I'd like to follow up on that: can you please share your Dockerfile, so we have the complete picture. If you are not comfortable sharing that, then you can please try to execute into the container and manually validate that the module.yaml is present both on the "worker" image and the "master" image, and it defines the remote function name correctly?

2. In your original email, the provided router does not route messages of type Any, but it actually forwards them as-in, the remote functions API requires that the message being sent to the remote function
is of type Any.  Can you try something like this:

final class EventRouter implements Router<com.google.protobuf.Message > {


         static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
    FunctionType("demo", "eventCounterPython");
         static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
    FunctionType("demo", "eventCounterJava");
         @Override
         public void route(com.google.protobuf.Message event,
    Downstream<com.google.protobuf.Message> downstream) {

    downstream.forward(
                     JAVA_EVENT_COUNTER_TYPE,
                     "count",
                     event)
             ;
             downstream.forward(
                     new Address(
                             PYTHON_EVENT_COUNTER_TYPE,
                             "count"
                     ),
    Any.pack(event)
             );
         }
    }



In addition you would have to change the definition of your ingress identifier to have a produced type of com.google.protobuf.Message
instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch <jan.bru...@neuland-bfi.de <mailto:jan.bru...@neuland-bfi.de>> wrote:

    Hi Igal,

    thanks for your reply. Initially I thought the same thing, but it
    turns out I am able to call the remote function from an embedded
    "wrapper" function using the exact same setup (Relevant Code
    below). So that's one kind of solution to that Problem. But to me
    it seems like it's a bit of a hack and not the idiomatic way to
    solve this...

    From my understanding of the address based communication within
    Flink Stateful Functions, I feel like it should be possible to
    call that function from the router directly. But I am probably
    either using the Router wrong or misunderstand some of the ideas
    behind address based communication...


    EventRouter.java

    
------------------------------------------------------------------------------------

    final class EventRouter implements Router<Event> {

      @Override
      public void route(Event event, Downstream<Event> downstream) {
        downstream.forward(EventCounterWrapper.TYPE, "_", event);
      }
    }

    
--------------------------------------------------------------------------------------


    EventCounterWrapper.java

    
---------------------------------------------------------------------------------------

    public class EventCounterWrapper implements StatefulFunction {

        static final FunctionType TYPE = new FunctionType("demo",
    "eventCounterWrapper");
        public static final FunctionType REMOTE_FUNCTION_TYPE = new
    FunctionType("demo/external", "eventCounterPython");

        @Override
        public void invoke(Context context, Object input) {
            if (input instanceof Event) {
                Event event = (Event) input;
                Any message = Any.pack(event);
                context.send(REMOTE_FUNCTION_TYPE, "_", message);
            }

            if (input instanceof Any) {
                final EventCount eventCount;
                try {
                    eventCount = ((Any) input).unpack(EventCount.class);
                } catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException("Unexpected type", e);
                }
    context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
            }
        }
    }

    
-----------------------------------------------------------------------------------


    worker.py
    ----------------------------------------------------
    @functions.bind("demo/external/eventCounterPython")
    def handle_event(context, _):
         state = context.state('count').unpack(EventCount)
         if not state:
             state = EventCount()
             state.count = 1
         else:
             state.count += 1
         context.state('count').pack(state)

        envelope = Any()
        envelope.Pack(state)
        context.reply(envelope)
    ----------------------------------------------------


    module.yaml

    ---------------------------------------------------------

    spec:
        functions:
          - function:
              meta:
                kind: http
                type: demo/external/eventCounterPython
              spec:
                endpoint: http://python-worker:8000/statefun
                states:
                  - count

    ---------------------------------------------------------


    Best Regards

    Jan


    On 03.07.20 17:33, Igal Shilman wrote:
    Hi Jan,
    Judging by the exception message it seems like the function type
    "demo/eventCounterPython" is not known to stateful functions.
    This could happen if the module.yaml (provided in your email) was
    accidentally excluded from the resulting artifact (Docker image
    or a jar-with-dependencies)
    Can you please verify that the module.yaml is present at runtime?

    Kind regards,
    Igal.

    On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch
    <jan.bru...@neuland-bfi.de <mailto:jan.bru...@neuland-bfi.de>> wrote:

        Hi,

        based on Gordons excellent advice on how to handle JSON
        messages with
        remote functions
        (https://www.mail-archive.com/user@flink.apache.org/msg34385.html)
        I was
        able to:

        1) Deserialize JSON Messages from a Kafka Stream

        2) Route the message to an embedded StatefulFunction

        3) Serialize the Resulting Protobuf Message to JSON and write
        it back to
        a Kafka Stream

        Now, instead of 2), I would like to route the message to a
        remote
        function, handle it there and write it back to the Stream as
        json via
        the serializer defined in Java. From my understanding all
        this should
        work through address based communication within the Stateful
        Functions
        Application. Unfortunately I don't get it to work. See the
        relevant code
        and error message below. The rest of the project structure
        basically
        follows the walkthrough example from the documentation. Any
        ideas or
        input would be greatly appreciated.


        module.yaml:

        --------------------------------

        ...

        spec:
             functions:
               - function:
                   meta:
                     kind: http
                     type: demo/eventCounterPython
                   spec:
                     endpoint: http://python-worker:8000/statefun
                     states:
                       - name: count
                     maxNumBatchRequests: 500

        ...

        ---------------------------------------


        EventIO.java

        ---------------------------------------------

        final class EventIO {

           static final EgressIdentifier<GlobalEventCount>
        EVENT_COUNT_PYTHON_EGRESS_ID =
               new EgressIdentifier<>("demo", "eventCountPython",
        GlobalEventCount.class);

             ....

           EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
             return
        KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
                 .withKafkaAddress(kafkaAddress)
        .withSerializer(GlobalEventCountPythonKafkaSerializer.class)
                 .build();
           }

           ....

           private static final class
        GlobalEventCountPythonKafkaSerializer
        implements KafkaEgressSerializer<GlobalEventCount> {

             private static final long serialVersionUID = 1L;

             @Override
             public ProducerRecord<byte[], byte[]>
        serialize(GlobalEventCount
        eventCount) {
               byte[] key = null;
               byte[] value = null;

               try {
                 String json = JsonFormat
                         .printer()
                         .includingDefaultValueFields()
                         .print(eventCount);
                 value = json.getBytes();
               } catch (InvalidProtocolBufferException e) {
                 e.printStackTrace();
               }

               return new ProducerRecord<>("eventCountPython", key,
        value);
             }
           }

        }

        --------------------------------------------


        EventModule.java:

        --------------------------------------

        public final class EventModule implements
        StatefulFunctionModule {

             @Override
             public void configure(Map<String, String>
        globalConfiguration,
        Binder binder) {

                 EventIO ioModule = new EventIO("kafka:9092");

        binder.bindIngress(ioModule.getIngressSpec());

        binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
        EventRouter());

        binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
        binder.bindEgress(ioModule.getEventCountPythonEgressSpec());

        binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE,
        unused
        -> new EventCountJavaStatefulFunction());
             }
        }

        ------------------------------------------


        EventRouter.java

        -------------------------------------------

        final class EventRouter implements Router<Event> {

             static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
        FunctionType("demo", "eventCounterPython");
             static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
        FunctionType("demo", "eventCounterJava");

             @Override
             public void route(Event event, Downstream<Event>
        downstream) {
                 downstream.forward(
                         JAVA_EVENT_COUNTER_TYPE,
                         "count",
                         event)
                 ;
                 downstream.forward(
                         new Address(
        PYTHON_EVENT_COUNTER_TYPE,
                                 "count"
                         ),
                         event
                 );
             }
        }

        ------------------------------------------


        worker.py

        -------------------------------------------

        @functions.bind("demo/eventCounterPython")
        def handle_event(context, _):
             state = context.state('count').unpack(GlobalEventCount)
             if not state:
                 state = GlobalEventCount()
                 state.value = 1
             else:
                 state.value += 1
             context.state('count').pack(state)

             egress_message =
        kafka_egress_record(topic="eventCountPython",
        value=state)
        context.pack_and_send_egress("demo/eventCountPython",
        egress_message)

        ------------------------------------------


        ERROR MESSAGE

        ------------------------------------------

        worker_1         | 2020-07-02 14:38:35,436 INFO
        org.apache.flink.runtime.taskmanager.Task                     -
        feedback-union -> functions -> (Sink:
        demo-eventCountPython-egress,
        Sink: demo-eventCountJava-egress) (1/1)
        (15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to
        FAILED.
        worker_1         | java.lang.IllegalArgumentException:
        Unknown provider
        for type FunctionType(demo, eventCounterPython)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
        worker_1         |     at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
        worker_1         |     at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
        worker_1         |     at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
        worker_1         |     at
        
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        worker_1         |     at
        
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
        worker_1         |     at
        
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
        worker_1         |     at
        
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
        worker_1         |     at
        org.apache.flink.streaming.runtime.io
        
<http://runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
        worker_1         |     at
        org.apache.flink.streaming.runtime.io
        
<http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
        worker_1         |     at
        org.apache.flink.streaming.runtime.io
        
<http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
        worker_1         |     at
        
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
        worker_1         |     at
        
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
        worker_1         |     at
        
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
        worker_1         |     at
        
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
        worker_1         |     at
        org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        worker_1         |     at
        org.apache.flink.runtime.taskmanager.Task.run(Task.java:533

        ------------------------------------------


        Best regards and thanks for taking the time to read all this,

        Jan

-- neuland – Büro für Informatik GmbH
    Konsul-Smidt-Str. 8g, 28217 Bremen

    Telefon (0421) 380107 57
    Fax (0421) 380107 99
    https://www.neuland-bfi.de

    https://twitter.com/neuland
    https://facebook.com/neulandbfi
    https://xing.com/company/neulandbfi


    Geschäftsführer: Thomas Gebauer, Jan Zander
    Registergericht: Amtsgericht Bremen, HRB 23395 HB
    USt-ID. DE 246585501

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply via email to