Re: Why does Flink FileSystem sink splits into multiple files

2021-03-15 Thread Yik San Chan
Thank you, it works.

Best,
Yik San Chan

On Mon, Mar 15, 2021 at 5:30 PM David Anderson  wrote:

> The first time you ran it without having specified the parallelism, and so
> you got the default parallelism -- which is greater than 1 (probably 4 or
> 8, depending on how many cores your computer has).
>
> Flink is designed to be scalable, and to achieve that, parallel instances
> of an operator, such as a sink, are decoupled from one another. Imagine,
> for example, a large cluster with 100s or 1000s of nodes. For this to work
> well, each instance needs to write to its own file.
>
> The commas were changed to tabs because you specified
> .field_delimiter('\t').
>
>
> Regards,
>
> David
>
> On Mon, Mar 15, 2021 at 9:49 AM Yik San Chan 
> wrote:
>
>> The question is cross-posted on StackOverflow
>> https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files
>> .
>>
>> I want to use Flink to read from an input file, do some aggregation, and
>> write the result to an output file. The job is in batch mode. See
>> `wordcount.py` below:
>>
>> ```python
>> from pyflink.table import EnvironmentSettings, BatchTableEnvironment
>>
>> #
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>>
>> env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
>> table_env =
>> BatchTableEnvironment.create(environment_settings=env_settings)
>>
>> my_source_ddl = """
>> create table mySource (
>> word VARCHAR
>> ) with (
>> 'connector' = 'filesystem',
>> 'format' = 'csv',
>> 'path' = '/tmp/input'
>> )
>> """
>>
>> my_sink_ddl = """
>> create table mySink (
>> word VARCHAR,
>> `count` BIGINT
>> ) with (
>> 'connector' = 'filesystem',
>> 'format' = 'csv',
>> 'path' = '/tmp/output'
>> )
>> """
>>
>> transform_dml = """
>> INSERT INTO mySink
>> SELECT word, COUNT(1) FROM mySource GROUP BY word
>> """
>>
>> table_env.execute_sql(my_source_ddl)
>> table_env.execute_sql(my_sink_ddl)
>> table_env.execute_sql(transform_dml).wait()
>>
>> # before run: echo -e  "flink\npyflink\nflink" > /tmp/input
>> # after run: cat /tmp/output
>> ```
>>
>> Before running `python wordcount.py`, I run `echo -e
>>  "flink\npyflink\nflink" > /tmp/input` to make sure data exist in
>> /tmp/input. However, after the run, there are two files in /tmp/output:
>>
>> ```
>> > ls /tmp/output
>> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
>> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
>> > cat
>> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
>> pyflink,1
>> > cat
>> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
>> flink,2
>> ```
>>
>> While I expect a single file /tmp/output with content:
>>
>> ```
>> pyflink,1
>> flink,2
>> ```
>>
>> Actually, I got the above python program by adjusting the below that
>> produces the single file /tmp/output.
>>
>> ```python
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.expressions import lit
>>
>> #
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>>
>> exec_env = ExecutionEnvironment.get_execution_environment()
>> exec_env.set_parallelism(1)
>> t_config = TableConfig()
>> t_env = BatchTableEnvironment.create(exec_env, t_config)
>>
>> t_env.connect(FileSystem().path('/tmp/input')) \
>> .with_format(OldCsv()
>>  .field('word', DataTypes.STRING())) \
>> .with_schema(Schema()
>>  .field('word', DataTypes.STRING())) \
>> .create_temporary_table('mySource')
>>
>> t_env.connect(FileSystem().path('/tmp/output')) \
>> .with_format(OldCsv()
>>  .field_delimiter('\t')
>>  .field('word', DataTypes.STRING())
>>  .field('count', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>>  .field('word', DataTypes.STRING())
>>  .field('count', DataTypes.BIGINT())) \
>> .create_temporary_table('mySink')
>>
>> tab = t_env.from_path('mySource')
>> tab.group_by(tab.word) \
>>.select(tab.word, lit(1).count) \
>>.execute_insert('mySink').wait()
>> ```
>>
>> Running this version will generate a /tmp/output. Note it doesn't come
>> with comma delimiter.
>>
>> ```
>> > cat /tmp/output
>> flink 2
>> pyflink 1
>> ```
>>
>> Any idea why? Thanks!
>>
>> Best,
>> Yik San Chan
>>
>


Re: Why does Flink FileSystem sink splits into multiple files

2021-03-15 Thread David Anderson
The first time you ran it without having specified the parallelism, and so
you got the default parallelism -- which is greater than 1 (probably 4 or
8, depending on how many cores your computer has).

Flink is designed to be scalable, and to achieve that, parallel instances
of an operator, such as a sink, are decoupled from one another. Imagine,
for example, a large cluster with 100s or 1000s of nodes. For this to work
well, each instance needs to write to its own file.

The commas were changed to tabs because you specified .field_delimiter('\t')
.


Regards,

David

On Mon, Mar 15, 2021 at 9:49 AM Yik San Chan 
wrote:

> The question is cross-posted on StackOverflow
> https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files
> .
>
> I want to use Flink to read from an input file, do some aggregation, and
> write the result to an output file. The job is in batch mode. See
> `wordcount.py` below:
>
> ```python
> from pyflink.table import EnvironmentSettings, BatchTableEnvironment
>
> #
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>
> env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
> table_env = BatchTableEnvironment.create(environment_settings=env_settings)
>
> my_source_ddl = """
> create table mySource (
> word VARCHAR
> ) with (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = '/tmp/input'
> )
> """
>
> my_sink_ddl = """
> create table mySink (
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = '/tmp/output'
> )
> """
>
> transform_dml = """
> INSERT INTO mySink
> SELECT word, COUNT(1) FROM mySource GROUP BY word
> """
>
> table_env.execute_sql(my_source_ddl)
> table_env.execute_sql(my_sink_ddl)
> table_env.execute_sql(transform_dml).wait()
>
> # before run: echo -e  "flink\npyflink\nflink" > /tmp/input
> # after run: cat /tmp/output
> ```
>
> Before running `python wordcount.py`, I run `echo -e
>  "flink\npyflink\nflink" > /tmp/input` to make sure data exist in
> /tmp/input. However, after the run, there are two files in /tmp/output:
>
> ```
> > ls /tmp/output
> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
> part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> > cat
> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
> pyflink,1
> > cat
> /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> flink,2
> ```
>
> While I expect a single file /tmp/output with content:
>
> ```
> pyflink,1
> flink,2
> ```
>
> Actually, I got the above python program by adjusting the below that
> produces the single file /tmp/output.
>
> ```python
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.expressions import lit
>
> #
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>
> exec_env = ExecutionEnvironment.get_execution_environment()
> exec_env.set_parallelism(1)
> t_config = TableConfig()
> t_env = BatchTableEnvironment.create(exec_env, t_config)
>
> t_env.connect(FileSystem().path('/tmp/input')) \
> .with_format(OldCsv()
>  .field('word', DataTypes.STRING())) \
> .with_schema(Schema()
>  .field('word', DataTypes.STRING())) \
> .create_temporary_table('mySource')
>
> t_env.connect(FileSystem().path('/tmp/output')) \
> .with_format(OldCsv()
>  .field_delimiter('\t')
>  .field('word', DataTypes.STRING())
>  .field('count', DataTypes.BIGINT())) \
> .with_schema(Schema()
>  .field('word', DataTypes.STRING())
>  .field('count', DataTypes.BIGINT())) \
> .create_temporary_table('mySink')
>
> tab = t_env.from_path('mySource')
> tab.group_by(tab.word) \
>.select(tab.word, lit(1).count) \
>.execute_insert('mySink').wait()
> ```
>
> Running this version will generate a /tmp/output. Note it doesn't come
> with comma delimiter.
>
> ```
> > cat /tmp/output
> flink 2
> pyflink 1
> ```
>
> Any idea why? Thanks!
>
> Best,
> Yik San Chan
>


Why does Flink FileSystem sink splits into multiple files

2021-03-15 Thread Yik San Chan
The question is cross-posted on StackOverflow
https://stackoverflow.com/questions/66634813/why-does-flink-filesystem-sink-splits-into-multiple-files
.

I want to use Flink to read from an input file, do some aggregation, and
write the result to an output file. The job is in batch mode. See
`wordcount.py` below:

```python
from pyflink.table import EnvironmentSettings, BatchTableEnvironment

#
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

my_source_ddl = """
create table mySource (
word VARCHAR
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/input'
)
"""

my_sink_ddl = """
create table mySink (
word VARCHAR,
`count` BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output'
)
"""

transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""

table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()

# before run: echo -e  "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output
```

Before running `python wordcount.py`, I run `echo -e
 "flink\npyflink\nflink" > /tmp/input` to make sure data exist in
/tmp/input. However, after the run, there are two files in /tmp/output:

```
> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat
/tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat
/tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2
```

While I expect a single file /tmp/output with content:

```
pyflink,1
flink,2
```

Actually, I got the above python program by adjusting the below that
produces the single file /tmp/output.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

#
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
 .field('word', DataTypes.STRING())) \
.with_schema(Schema()
 .field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
 .field_delimiter('\t')
 .field('word', DataTypes.STRING())
 .field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
 .field('word', DataTypes.STRING())
 .field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Running this version will generate a /tmp/output. Note it doesn't come with
comma delimiter.

```
> cat /tmp/output
flink 2
pyflink 1
```

Any idea why? Thanks!

Best,
Yik San Chan