Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem like the
function type is unavailable, and I'd like to follow up on that: can you
please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, then
you can please try to execute into the container and manually validate that
the module.yaml is present
both on the "worker" image and the "master" image, and it defines the
remote function name correctly?

2. In your original email, the provided router does not route messages of
type Any, but it actually
forwards them as-in, the remote functions API requires that the message
being sent to the remote function
is of type Any.  Can you try something like this:

final class EventRouter implements Router<com.google.protobuf.Message > {

>
>      static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
> FunctionType("demo", "eventCounterPython");
>      static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
> FunctionType("demo", "eventCounterJava");
>      @Override
>      public void route(com.google.protobuf.Message event, Downstream<
> com.google.protobuf.Message> downstream) {
>
         downstream.forward(
>                  JAVA_EVENT_COUNTER_TYPE,
>                  "count",
>                  event)
>          ;
>          downstream.forward(
>                  new Address(
>                          PYTHON_EVENT_COUNTER_TYPE,
>                          "count"
>                  ),
>                  Any.pack(event)
>          );
>      }
> }



In addition you would have to change the definition of your ingress
identifier to have a produced type of com.google.protobuf.Message
instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch <jan.bru...@neuland-bfi.de>
wrote:

> Hi Igal,
>
> thanks for your reply. Initially I thought the same thing, but it turns
> out I am able to call the remote function from an embedded "wrapper"
> function using the exact same setup (Relevant Code below). So that's one
> kind of solution to that Problem. But to me it seems like it's a bit of a
> hack and not the idiomatic way to solve this...
>
> From my understanding of the address based communication within Flink
> Stateful Functions, I feel like it should be possible to call that function
> from the router directly. But I am probably either using the Router wrong
> or misunderstand some of the ideas behind address based communication...
>
>
> EventRouter.java
>
>
> ------------------------------------------------------------------------------------
>
> final class EventRouter implements Router<Event> {
>
>   @Override
>   public void route(Event event, Downstream<Event> downstream) {
>     downstream.forward(EventCounterWrapper.TYPE, "_", event);
>   }
> }
>
>
> --------------------------------------------------------------------------------------
>
>
> EventCounterWrapper.java
>
>
> ---------------------------------------------------------------------------------------
>
> public class EventCounterWrapper implements StatefulFunction {
>
>     static final FunctionType TYPE = new FunctionType("demo",
> "eventCounterWrapper");
>     public static final FunctionType REMOTE_FUNCTION_TYPE = new
> FunctionType("demo/external", "eventCounterPython");
>
>     @Override
>     public void invoke(Context context, Object input) {
>         if (input instanceof Event) {
>             Event event = (Event) input;
>             Any message = Any.pack(event);
>             context.send(REMOTE_FUNCTION_TYPE, "_", message);
>         }
>
>         if (input instanceof Any) {
>             final EventCount eventCount;
>             try {
>                 eventCount = ((Any) input).unpack(EventCount.class);
>             } catch (InvalidProtocolBufferException e) {
>                 throw new RuntimeException("Unexpected type", e);
>             }
>             context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
>         }
>     }
> }
>
>
> -----------------------------------------------------------------------------------
>
>
> worker.py
> ----------------------------------------------------
> @functions.bind("demo/external/eventCounterPython")
> def handle_event(context, _):
>      state = context.state('count').unpack(EventCount)
>      if not state:
>          state = EventCount()
>          state.count = 1
>      else:
>          state.count += 1
>      context.state('count').pack(state)
>
>
>     envelope = Any()
>     envelope.Pack(state)
>     context.reply(envelope)
> ----------------------------------------------------
>
>
> module.yaml
>
> ---------------------------------------------------------
>
> spec:
>     functions:
>       - function:
>           meta:
>             kind: http
>             type: demo/external/eventCounterPython
>           spec:
>             endpoint: http://python-worker:8000/statefun
>             states:
>               - count
>
> ---------------------------------------------------------
>
>
> Best Regards
>
> Jan
>
>
> On 03.07.20 17:33, Igal Shilman wrote:
>
> Hi Jan,
> Judging by the exception message it seems like the function type
> "demo/eventCounterPython" is not known to stateful functions.
> This could happen if the module.yaml (provided in your email) was
> accidentally excluded from the resulting artifact (Docker image or a
> jar-with-dependencies)
> Can you please verify that the module.yaml is present at runtime?
>
> Kind regards,
> Igal.
>
> On Thu, Jul 2, 2020 at 7:29 PM Jan Brusch <jan.bru...@neuland-bfi.de>
> wrote:
>
>> Hi,
>>
>> based on Gordons excellent advice on how to handle JSON messages with
>> remote functions
>> (https://www.mail-archive.com/user@flink.apache.org/msg34385.html) I was
>> able to:
>>
>> 1) Deserialize JSON Messages from a Kafka Stream
>>
>> 2) Route the message to an embedded StatefulFunction
>>
>> 3) Serialize the Resulting Protobuf Message to JSON and write it back to
>> a Kafka Stream
>>
>> Now, instead of 2), I would like to route the message to a remote
>> function, handle it there and write it back to the Stream as json via
>> the serializer defined in Java. From my understanding all this should
>> work through address based communication within the Stateful Functions
>> Application. Unfortunately I don't get it to work. See the relevant code
>> and error message below. The rest of the project structure basically
>> follows the walkthrough example from the documentation. Any ideas or
>> input would be greatly appreciated.
>>
>>
>> module.yaml:
>>
>> --------------------------------
>>
>> ...
>>
>> spec:
>>      functions:
>>        - function:
>>            meta:
>>              kind: http
>>              type: demo/eventCounterPython
>>            spec:
>>              endpoint: http://python-worker:8000/statefun
>>              states:
>>                - name: count
>>              maxNumBatchRequests: 500
>>
>> ...
>>
>> ---------------------------------------
>>
>>
>> EventIO.java
>>
>> ---------------------------------------------
>>
>> final class EventIO {
>>
>>    static final EgressIdentifier<GlobalEventCount>
>> EVENT_COUNT_PYTHON_EGRESS_ID =
>>        new EgressIdentifier<>("demo", "eventCountPython",
>> GlobalEventCount.class);
>>
>>      ....
>>
>>    EgressSpec<GlobalEventCount> getEventCountPythonEgressSpec() {
>>      return KafkaEgressBuilder.forIdentifier(EVENT_COUNT_PYTHON_EGRESS_ID)
>>          .withKafkaAddress(kafkaAddress)
>> .withSerializer(GlobalEventCountPythonKafkaSerializer.class)
>>          .build();
>>    }
>>
>>    ....
>>
>>    private static final class GlobalEventCountPythonKafkaSerializer
>> implements KafkaEgressSerializer<GlobalEventCount> {
>>
>>      private static final long serialVersionUID = 1L;
>>
>>      @Override
>>      public ProducerRecord<byte[], byte[]> serialize(GlobalEventCount
>> eventCount) {
>>        byte[] key = null;
>>        byte[] value = null;
>>
>>        try {
>>          String json = JsonFormat
>>                  .printer()
>>                  .includingDefaultValueFields()
>>                  .print(eventCount);
>>          value = json.getBytes();
>>        } catch (InvalidProtocolBufferException e) {
>>          e.printStackTrace();
>>        }
>>
>>        return new ProducerRecord<>("eventCountPython", key, value);
>>      }
>>    }
>>
>> }
>>
>> --------------------------------------------
>>
>>
>> EventModule.java:
>>
>> --------------------------------------
>>
>> public final class EventModule implements StatefulFunctionModule {
>>
>>      @Override
>>      public void configure(Map<String, String> globalConfiguration,
>> Binder binder) {
>>
>>          EventIO ioModule = new EventIO("kafka:9092");
>>
>>          binder.bindIngress(ioModule.getIngressSpec());
>>
>>          binder.bindIngressRouter(EventIO.EVENT_INGRESS_ID, new
>> EventRouter());
>>
>>          binder.bindEgress(ioModule.getEventCountJavaEgressSpec());
>> binder.bindEgress(ioModule.getEventCountPythonEgressSpec());
>>
>> binder.bindFunctionProvider(EventCountJavaStatefulFunction.TYPE, unused
>> -> new EventCountJavaStatefulFunction());
>>      }
>> }
>>
>> ------------------------------------------
>>
>>
>> EventRouter.java
>>
>> -------------------------------------------
>>
>> final class EventRouter implements Router<Event> {
>>
>>      static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
>> FunctionType("demo", "eventCounterPython");
>>      static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
>> FunctionType("demo", "eventCounterJava");
>>
>>      @Override
>>      public void route(Event event, Downstream<Event> downstream) {
>>          downstream.forward(
>>                  JAVA_EVENT_COUNTER_TYPE,
>>                  "count",
>>                  event)
>>          ;
>>          downstream.forward(
>>                  new Address(
>>                          PYTHON_EVENT_COUNTER_TYPE,
>>                          "count"
>>                  ),
>>                  event
>>          );
>>      }
>> }
>>
>> ------------------------------------------
>>
>>
>> worker.py
>>
>> -------------------------------------------
>>
>> @functions.bind("demo/eventCounterPython")
>> def handle_event(context, _):
>>      state = context.state('count').unpack(GlobalEventCount)
>>      if not state:
>>          state = GlobalEventCount()
>>          state.value = 1
>>      else:
>>          state.value += 1
>>      context.state('count').pack(state)
>>
>>      egress_message = kafka_egress_record(topic="eventCountPython",
>> value=state)
>>      context.pack_and_send_egress("demo/eventCountPython", egress_message)
>>
>> ------------------------------------------
>>
>>
>> ERROR MESSAGE
>>
>> ------------------------------------------
>>
>> worker_1         | 2020-07-02 14:38:35,436 INFO
>> org.apache.flink.runtime.taskmanager.Task                     -
>> feedback-union -> functions -> (Sink: demo-eventCountPython-egress,
>> Sink: demo-eventCountJava-egress) (1/1)
>> (15ea8a98c215693d6e7d5c80f3e5f8b6) switched from RUNNING to FAILED.
>> worker_1         | java.lang.IllegalArgumentException: Unknown provider
>> for type FunctionType(demo, eventCounterPython)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:44)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:63)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:56)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:135)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:130)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:82)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:174)
>> worker_1         |     at
>>
>> org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:80)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>> worker_1         |     at
>> org.apache.flink.streaming.runtime.io
>> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>> worker_1         |     at
>> org.apache.flink.streaming.runtime.io
>> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>> worker_1         |     at
>> org.apache.flink.streaming.runtime.io
>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
>> worker_1         |     at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
>> worker_1         |     at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>> worker_1         |     at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:533
>>
>> ------------------------------------------
>>
>>
>> Best regards and thanks for taking the time to read all this,
>>
>> Jan
>>
>> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99https://www.neuland-bfi.de
> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>

Reply via email to