Hi,
I'm trying to understand how to use python classes in flink DataStream
pipelines. I'm using python 3.11 and flink 1.19.
I've tried running a few simple programs and require some guidance.
Here's the first example:
from dataclasses import dataclass
from pyflink.common import Configuration
from pyflink.datastream import StreamExecutionEnvironment
# from page import Page # importing the same class defined in a separate module
works
# defining the class in the same file as the stream fails
@dataclass
class Page:
text: str
number: int
if __name__ == '__main__':
config = Configuration()
# fails with this line commented or uncommented if Page is defined in this
file
# config.set_string("python.execution-mode", "thread")
env = StreamExecutionEnvironment.get_execution_environment(config)
# write all the data to one file
env.set_parallelism(1)
data_stream = env.from_collection([
Page(p[1], p[0]) for p in
[(1, 'hi'), (2, 'hello'), (3, 'hi'),
(4, 'hello'), (5, 'hi'), (6, 'hello'),
(6, 'hello')]] * 10000
)
ds = data_stream.map(lambda p: Page(p.text, p.number + 1))
ds.print()
env.execute()
This consistently fails in both process and thread execution modes.
To make it work I need to define the same Page class in a separate module and
import it, then it works in both cases.
Here's a second example, involving a tumbling count window:
from typing import Iterable
from pyflink.common import Configuration, Types
from pyflink.datastream import StreamExecutionEnvironment, WindowFunction
from pyflink.datastream.window import CountWindow
from page import Page
class SumWindowFunction(WindowFunction[Page, Page, str, CountWindow]):
def apply(self, key: str, window: CountWindow, inputs: Iterable[Page]):
result = Page('', 0)
for i in inputs:
result.text += i.text
result.number += i.number
return [result]
if __name__ == '__main__':
config = Configuration()
config.set_string("python.execution-mode", "thread")
env = StreamExecutionEnvironment.get_execution_environment(config)
# write all the data to one file
env.set_parallelism(1)
# define the source
data_stream = env.from_collection(
[Page(p[1], p[0]) for p in [
(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6,
'hello'), (6, 'hello')]] * 10000
)
ds = data_stream.key_by(lambda x: x.text, key_type=Types.STRING()) \
.count_window(2) \
.apply(SumWindowFunction())
ds.print()
# submit for execution
env.execute()
This works in process mode, but fails in thread mode:
Caused by: java.lang.ClassCastException: pemja.core.object.PyIterator cannot be
cast to pemja.core.object.PyIterator
at
org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.processElement(AbstractOneInputEmbeddedPythonFunctionOperator.java:156)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
I'd appreciate if you could point out what I'm doing wrong, or direct me
towards documentation that would explain how to do this properly.
Best regards,
Olex