Jaehyeon Kim created FLINK-33404:
------------------------------------

             Summary: on_timer method is missing in ProcessFunction and 
CoProcessFunction of Pyflink
                 Key: FLINK-33404
                 URL: https://issues.apache.org/jira/browse/FLINK-33404
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Jaehyeon Kim


Hello,

I find the `on_timer` method is not found in ProcessFunction and 
CoProcessFunction of Pyflink and it causes an error when I register a timer eg)

 ```
  ...
  File 
"/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/input_handler.py",
 line 101, in process_timer
    yield from _emit_results(
  File 
"/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/input_handler.py",
 line 131, in _emit_results
    for result in results:
  File 
"/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/input_handler.py",
 line 114, in _on_processing_time
    yield from self._on_processing_time_func(timestamp, key, namespace)
  File 
"/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py",
 line 308, in on_processing_time
    return _on_timer(TimeDomain.PROCESSING_TIME, timestamp, key)
  File 
"/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py",
 line 317, in _on_timer
    return process_function.on_timer(timestamp, on_timer_ctx)
AttributeError: 'ReadingFilter' object has no attribute 'on_timer'

        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
        at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        ... 3 more
```

I'm working on Pyflink 1.17.1 but it would be applicable other versions. 

Can the method be added to the functions?

Cheers,
Jaehyeon



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to