Thank you, it works. Best, Yik San Chan
On Mon, Mar 15, 2021 at 5:30 PM David Anderson <dander...@apache.org> wrote: > The first time you ran it without having specified the parallelism, and so > you got the default parallelism -- which is greater than 1 (probably 4 or > 8, depending on how many cores your computer has). > > Flink is designed to be scalable, and to achieve that, parallel instances > of an operator, such as a sink, are decoupled from one another. Imagine, > for example, a large cluster with 100s or 1000s of nodes. For this to work > well, each instance needs to write to its own file. > > The commas were changed to tabs because you specified > .field_delimiter('\t'). > > > Regards, > > David > > On Mon, Mar 15, 2021 at 9:49 AM Yik San Chan <evan.chanyik...@gmail.com> > wrote: > >> The question is cross-posted on StackOverflow >> https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files >> . >> >> I want to use Flink to read from an input file, do some aggregation, and >> write the result to an output file. The job is in batch mode. See >> `wordcount.py` below: >> >> ```python >> from pyflink.table import EnvironmentSettings, BatchTableEnvironment >> >> # >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html >> >> env_settings = EnvironmentSettings.new_instance().in_batch_mode().build() >> table_env = >> BatchTableEnvironment.create(environment_settings=env_settings) >> >> my_source_ddl = """ >> create table mySource ( >> word VARCHAR >> ) with ( >> 'connector' = 'filesystem', >> 'format' = 'csv', >> 'path' = '/tmp/input' >> ) >> """ >> >> my_sink_ddl = """ >> create table mySink ( >> word VARCHAR, >> `count` BIGINT >> ) with ( >> 'connector' = 'filesystem', >> 'format' = 'csv', >> 'path' = '/tmp/output' >> ) >> """ >> >> transform_dml = """ >> INSERT INTO mySink >> SELECT word, COUNT(1) FROM mySource GROUP BY word >> """ >> >> table_env.execute_sql(my_source_ddl) >> table_env.execute_sql(my_sink_ddl) >> table_env.execute_sql(transform_dml).wait() >> >> # before run: echo -e "flink\npyflink\nflink" > /tmp/input >> # after run: cat /tmp/output >> ``` >> >> Before running `python wordcount.py`, I run `echo -e >> "flink\npyflink\nflink" > /tmp/input` to make sure data exist in >> /tmp/input. However, after the run, there are two files in /tmp/output: >> >> ``` >> > ls /tmp/output >> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 >> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0 >> > cat >> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 >> pyflink,1 >> > cat >> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0 >> flink,2 >> ``` >> >> While I expect a single file /tmp/output with content: >> >> ``` >> pyflink,1 >> flink,2 >> ``` >> >> Actually, I got the above python program by adjusting the below that >> produces the single file /tmp/output. >> >> ```python >> from pyflink.dataset import ExecutionEnvironment >> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment >> from pyflink.table.descriptors import Schema, OldCsv, FileSystem >> from pyflink.table.expressions import lit >> >> # >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html >> >> exec_env = ExecutionEnvironment.get_execution_environment() >> exec_env.set_parallelism(1) >> t_config = TableConfig() >> t_env = BatchTableEnvironment.create(exec_env, t_config) >> >> t_env.connect(FileSystem().path('/tmp/input')) \ >> .with_format(OldCsv() >> .field('word', DataTypes.STRING())) \ >> .with_schema(Schema() >> .field('word', DataTypes.STRING())) \ >> .create_temporary_table('mySource') >> >> t_env.connect(FileSystem().path('/tmp/output')) \ >> .with_format(OldCsv() >> .field_delimiter('\t') >> .field('word', DataTypes.STRING()) >> .field('count', DataTypes.BIGINT())) \ >> .with_schema(Schema() >> .field('word', DataTypes.STRING()) >> .field('count', DataTypes.BIGINT())) \ >> .create_temporary_table('mySink') >> >> tab = t_env.from_path('mySource') >> tab.group_by(tab.word) \ >> .select(tab.word, lit(1).count) \ >> .execute_insert('mySink').wait() >> ``` >> >> Running this version will generate a /tmp/output. Note it doesn't come >> with comma delimiter. >> >> ``` >> > cat /tmp/output >> flink 2 >> pyflink 1 >> ``` >> >> Any idea why? Thanks! >> >> Best, >> Yik San Chan >> >