I’m running this script taken from the Flink website: tutorial.py
python tutorial.py
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
def tutorial():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection(
collection=[(1, 'aaa'), (2, 'bbb')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds.add_sink(StreamingFileSink
.for_row_format('/tmp/output', SimpleStringEncoder())
.build())
env.execute("tutorial_job")
if __name__ == '__main__':
tutorial()
It correctly outputs a part file to the /tmp/output directory when I run it
locally. However when I run this on my kubernetes session cluster there is
no output. Any ideas?
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule tutorial \
--pyFiles /opt/flink-1.12.0/examples/tutorial.py \
--detached
--
Robert Cullen
240-475-4490