Hi Dian, Thanks for your help, again!
Best, Yik San On Wed, Apr 21, 2021 at 8:39 PM Dian Fu <dian0511...@gmail.com> wrote: > Hi Yik San, > > You need to set the rolling policy for filesystem. You could refer to the > Rolling Policy section [1] for more details. > > Actually there are output and you could execute command `ls -la > /tmp/output/`, then you will see several files named “.part-xxx”. > > For your job, you need to set the `execution.checkpointing.interval` in > the configuration and `sink.rolling-policy.rollover-interval` in the > property of Filesystem connector. > > > The job will look like the following: > ``` > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = > StreamTableEnvironment.create(environment_settings=env_settings) > table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", > "10s") > > table_env.execute_sql(""" > CREATE TABLE datagen ( > id INT, > data STRING > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1' > ) > """) > > table_env.execute_sql(""" > CREATE TABLE print ( > id INT, > data STRING > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = '/tmp/output', > 'sink.rolling-policy.rollover-interval' = '10s' > ) > """) > > table_env.execute_sql(""" > INSERT INTO print > SELECT id, data > FROM datagen > """).wait() > ``` > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#rolling-policy > > 2021年4月21日 下午7:44,Yik San Chan <evan.chanyik...@gmail.com> 写道: > > The question is cross posted on Stack Overflow > https://stackoverflow.com/questions/67195207/flink-not-able-to-sink-a-stream-into-csv > . > > I am trying to sink a stream into filesystem in csv format using PyFlink, > however it does not work. > > ```python > # stream_to_csv.py > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > table_env = > StreamTableEnvironment.create(environment_settings=env_settings) > > table_env.execute_sql(""" > CREATE TABLE datagen ( > id INT, > data STRING > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1' > ) > """) > > table_env.execute_sql(""" > CREATE TABLE print ( > id INT, > data STRING > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'csv', > 'path' = '/tmp/output' > ) > """) > > table_env.execute_sql(""" > INSERT INTO print > SELECT id, data > FROM datagen > """).wait() > ``` > > To run the script: > > ``` > $ python stream_to_csv.py > ``` > > I expect records go to /tmp/output folder, however that doesn't happen. > > ``` > $ ~ ls /tmp/output > (nothing shown here) > ``` > > Anything I miss? > > Best, > Yik San > > >