[ 
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334440#comment-17334440
 ] 

zhao commented on BEAM-11998:
-----------------------------

Thanks for the response. I tried, but no luck.  Perhaps my code is incorrect? 
Any suggestion would be greatly appreciated. 

Here is the running command:
python -m sample --runner=FlinkRunner --flink_version 1.12 
--flink_master=myMaster --environment_type=DOCKER --save_main_session 
--flink_submit_uber_jar --experiments=use_deprecated_read --streaming

Here is the demo code:
from __future__ import absolute_import

import logging
import typing
import argparse
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka,WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def run_pipeline(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)

    p = beam.Pipeline(options=pipeline_options)

    (p
    | 'read from kafka' >> ReadFromKafka(
     consumer_config=\{'bootstrap.servers': boot_server},
    topics=['test003'])
    | beam.Map(print)
 )
 result = p.run()
 result.wait_until_finish()


if __name__ == '__main__':
 logging.getLogger().setLevel(logging.INFO)

run_pipeline()

Here is the running grap。You can clearly notice that the reading slot finished. 
!read.png!

> Portable runners should be able to issue checkpoints to Splittable DoFn
> -----------------------------------------------------------------------
>
>                 Key: BEAM-11998
>                 URL: https://issues.apache.org/jira/browse/BEAM-11998
>             Project: Beam
>          Issue Type: New Feature
>          Components: cross-language, runner-flink, runner-spark
>            Reporter: Boyuan Zhang
>            Priority: P2
>         Attachments: read.png, read.png
>
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly, 
> portable runners should issue split(ProcessBundleSplitRequest with 
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest 
> with fraction_of_remainder == 0) to SDK regularly to make current bundle 
> finished processing instead of running forever.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to