Hi Dian,

Thanks for your help, again!

Best,
Yik San

On Wed, Apr 21, 2021 at 8:39 PM Dian Fu <dian0511...@gmail.com> wrote:

> Hi Yik San,
>
> You need to set the rolling policy for filesystem. You could refer to the
> Rolling Policy section [1] for more details.
>
> Actually there are output and you could execute command `ls -la
> /tmp/output/`, then you will see several files named “.part-xxx”.
>
> For your job, you need to set the `execution.checkpointing.interval` in
> the configuration and `sink.rolling-policy.rollover-interval` in the
> property of Filesystem connector.
>
>
> The job will look like the following:
> ```
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env =
> StreamTableEnvironment.create(environment_settings=env_settings)
> table_env.get_config().get_configuration().set_string("execution.checkpointing.interval",
> "10s")
>
> table_env.execute_sql("""
>     CREATE TABLE datagen (
>         id INT,
>         data STRING
>     ) WITH (
>         'connector' = 'datagen',
>         'rows-per-second' = '1'
>     )
> """)
>
> table_env.execute_sql("""
>     CREATE TABLE print (
>         id INT,
>         data STRING
>     ) WITH (
>         'connector' = 'filesystem',
>         'format' = 'csv',
>         'path' = '/tmp/output',
>         'sink.rolling-policy.rollover-interval' = '10s'
>     )
> """)
>
> table_env.execute_sql("""
> INSERT INTO print
> SELECT id, data
> FROM datagen
> """).wait()
> ```
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#rolling-policy
>
> 2021年4月21日 下午7:44,Yik San Chan <evan.chanyik...@gmail.com> 写道:
>
> The question is cross posted on Stack Overflow
> https://stackoverflow.com/questions/67195207/flink-not-able-to-sink-a-stream-into-csv
> .
>
> I am trying to sink a stream into filesystem in csv format using PyFlink,
> however it does not work.
>
> ```python
> # stream_to_csv.py
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env =
> StreamTableEnvironment.create(environment_settings=env_settings)
>
> table_env.execute_sql("""
>     CREATE TABLE datagen (
>         id INT,
>         data STRING
>     ) WITH (
>         'connector' = 'datagen',
>         'rows-per-second' = '1'
>     )
> """)
>
> table_env.execute_sql("""
>     CREATE TABLE print (
>         id INT,
>         data STRING
>     ) WITH (
>         'connector' = 'filesystem',
>         'format' = 'csv',
>         'path' = '/tmp/output'
>     )
> """)
>
> table_env.execute_sql("""
> INSERT INTO print
> SELECT id, data
> FROM datagen
> """).wait()
> ```
>
> To run the script:
>
> ```
> $ python stream_to_csv.py
> ```
>
> I expect records go to /tmp/output folder, however that doesn't happen.
>
> ```
> $ ~ ls /tmp/output
> (nothing shown here)
> ```
>
> Anything I miss?
>
> Best,
> Yik San
>
>
>

Reply via email to