[
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334440#comment-17334440
]
zhao commented on BEAM-11998:
-----------------------------
Thanks for the response. I tried, but no luck. Perhaps my code is incorrect?
Any suggestion would be greatly appreciated.
Here is the running command:
python -m sample --runner=FlinkRunner --flink_version 1.12
--flink_master=myMaster --environment_type=DOCKER --save_main_session
--flink_submit_uber_jar --experiments=use_deprecated_read --streaming
Here is the demo code:
from __future__ import absolute_import
import logging
import typing
import argparse
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka,WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run_pipeline(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
(p
| 'read from kafka' >> ReadFromKafka(
consumer_config=\{'bootstrap.servers': boot_server},
topics=['test003'])
| beam.Map(print)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run_pipeline()
Here is the running grap。You can clearly notice that the reading slot finished.
!read.png!
> Portable runners should be able to issue checkpoints to Splittable DoFn
> -----------------------------------------------------------------------
>
> Key: BEAM-11998
> URL: https://issues.apache.org/jira/browse/BEAM-11998
> Project: Beam
> Issue Type: New Feature
> Components: cross-language, runner-flink, runner-spark
> Reporter: Boyuan Zhang
> Priority: P2
> Attachments: read.png, read.png
>
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly,
> portable runners should issue split(ProcessBundleSplitRequest with
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest
> with fraction_of_remainder == 0) to SDK regularly to make current bundle
> finished processing instead of running forever.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)