Thank you, it works.

Best,
Yik San Chan

On Mon, Mar 15, 2021 at 5:30 PM David Anderson <dander...@apache.org> wrote:

> The first time you ran it without having specified the parallelism, and so
> you got the default parallelism -- which is greater than 1 (probably 4 or
> 8, depending on how many cores your computer has).
>
> Flink is designed to be scalable, and to achieve that, parallel instances
> of an operator, such as a sink, are decoupled from one another. Imagine,
> for example, a large cluster with 100s or 1000s of nodes. For this to work
> well, each instance needs to write to its own file.
>
> The commas were changed to tabs because you specified
> .field_delimiter('\t').
>
>
> Regards,
>
> David
>
> On Mon, Mar 15, 2021 at 9:49 AM Yik San Chan <evan.chanyik...@gmail.com>
> wrote:
>
>> The question is cross-posted on StackOverflow
>> https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files
>> .
>>
>> I want to use Flink to read from an input file, do some aggregation, and
>> write the result to an output file. The job is in batch mode. See
>> `wordcount.py` below:
>>
>> ```python
>> from pyflink.table import EnvironmentSettings, BatchTableEnvironment
>>
>> #
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>>
>> env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
>> table_env =
>> BatchTableEnvironment.create(environment_settings=env_settings)
>>
>> my_source_ddl = """
>>     create table mySource (
>>         word VARCHAR
>>     ) with (
>>         'connector' = 'filesystem',
>>         'format' = 'csv',
>>         'path' = '/tmp/input'
>>     )
>> """
>>
>> my_sink_ddl = """
>>     create table mySink (
>>         word VARCHAR,
>>         `count` BIGINT
>>     ) with (
>>         'connector' = 'filesystem',
>>         'format' = 'csv',
>>         'path' = '/tmp/output'
>>     )
>> """
>>
>> transform_dml = """
>> INSERT INTO mySink
>> SELECT word, COUNT(1) FROM mySource GROUP BY word
>> """
>>
>> table_env.execute_sql(my_source_ddl)
>> table_env.execute_sql(my_sink_ddl)
>> table_env.execute_sql(transform_dml).wait()
>>
>> # before run: echo -e  "flink\npyflink\nflink" > /tmp/input
>> # after run: cat /tmp/output
>> ```
>>
>> Before running `python wordcount.py`, I run `echo -e
>>  "flink\npyflink\nflink" > /tmp/input` to make sure data exist in
>> /tmp/input. However, after the run, there are two files in /tmp/output:
>>
>> ```
>> > ls /tmp/output
>> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
>> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
>> > cat
>> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
>> pyflink,1
>> > cat
>> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
>> flink,2
>> ```
>>
>> While I expect a single file /tmp/output with content:
>>
>> ```
>> pyflink,1
>> flink,2
>> ```
>>
>> Actually, I got the above python program by adjusting the below that
>> produces the single file /tmp/output.
>>
>> ```python
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.expressions import lit
>>
>> #
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>>
>> exec_env = ExecutionEnvironment.get_execution_environment()
>> exec_env.set_parallelism(1)
>> t_config = TableConfig()
>> t_env = BatchTableEnvironment.create(exec_env, t_config)
>>
>> t_env.connect(FileSystem().path('/tmp/input')) \
>>     .with_format(OldCsv()
>>                  .field('word', DataTypes.STRING())) \
>>     .with_schema(Schema()
>>                  .field('word', DataTypes.STRING())) \
>>     .create_temporary_table('mySource')
>>
>> t_env.connect(FileSystem().path('/tmp/output')) \
>>     .with_format(OldCsv()
>>                  .field_delimiter('\t')
>>                  .field('word', DataTypes.STRING())
>>                  .field('count', DataTypes.BIGINT())) \
>>     .with_schema(Schema()
>>                  .field('word', DataTypes.STRING())
>>                  .field('count', DataTypes.BIGINT())) \
>>     .create_temporary_table('mySink')
>>
>> tab = t_env.from_path('mySource')
>> tab.group_by(tab.word) \
>>    .select(tab.word, lit(1).count) \
>>    .execute_insert('mySink').wait()
>> ```
>>
>> Running this version will generate a /tmp/output. Note it doesn't come
>> with comma delimiter.
>>
>> ```
>> > cat /tmp/output
>> flink 2
>> pyflink 1
>> ```
>>
>> Any idea why? Thanks!
>>
>> Best,
>> Yik San Chan
>>
>

Reply via email to