How to stream CSV from S3?

2020-07-27 Thread John Smith
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is
copied to S3?
3- Is that part of the table APIs?


Re: How to stream CSV from S3?

2020-07-27 Thread Jingsong Li
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support
partitioned table. So the only way is specific the partition/bucket path,
and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports
partitioned table, complete support partition semantics.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith  wrote:

> Hi, using Flink 1.10
>
> 1- How do we go about reading CSV files that are copied to s3 buckets?
> 2- Is there a source that can tail S3 and start reading a CSV when it is
> copied to S3?
> 3- Is that part of the table APIs?
>


-- 
Best, Jingsong Lee


Re: How to stream CSV from S3?

2020-07-28 Thread John Smith
Bassically I want to "monitor" a bucket on S3 and every file that gets
created in that bucket read it and stream it.

If I understand correctly, I can just use env.readCsvFile() and config to
continuously read a folder path?


On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, 
wrote:

> Hi John,
>
> Do you mean you want to read S3 CSV files using partition/bucket pruning?
>
> If just using the DataSet API, you can use CsvInputFormat to read csv
> files.
>
> If you want to use Table/Sql API, In 1.10, Csv format in table not support
> partitioned table. So the only way is specific the partition/bucket path,
> and read single directory.
>
> In 1.11, the Table/Sql filesystem connector with csv format supports
> partitioned table, complete support partition semantics.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>
> Best,
> Jingsong
>
> On Mon, Jul 27, 2020 at 10:54 PM John Smith 
> wrote:
>
>> Hi, using Flink 1.10
>>
>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>> 2- Is there a source that can tail S3 and start reading a CSV when it is
>> copied to S3?
>> 3- Is that part of the table APIs?
>>
>
>
> --
> Best, Jingsong Lee
>


Re: How to stream CSV from S3?

2020-07-28 Thread Jingsong Li
Yes, you can try `StreamExecutionEnvironment.readFile(RowCsvInputFormat,
filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)`. (And
wrap it to a table if you want)

On Tue, Jul 28, 2020 at 3:46 PM John Smith  wrote:

> Bassically I want to "monitor" a bucket on S3 and every file that gets
> created in that bucket read it and stream it.
>
> If I understand correctly, I can just use env.readCsvFile() and config to
> continuously read a folder path?
>
>
> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, 
> wrote:
>
>> Hi John,
>>
>> Do you mean you want to read S3 CSV files using partition/bucket pruning?
>>
>> If just using the DataSet API, you can use CsvInputFormat to read csv
>> files.
>>
>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>> support partitioned table. So the only way is specific the partition/bucket
>> path, and read single directory.
>>
>> In 1.11, the Table/Sql filesystem connector with csv format supports
>> partitioned table, complete support partition semantics.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jul 27, 2020 at 10:54 PM John Smith 
>> wrote:
>>
>>> Hi, using Flink 1.10
>>>
>>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>>> 2- Is there a source that can tail S3 and start reading a CSV when it is
>>> copied to S3?
>>> 3- Is that part of the table APIs?
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


Re: How to stream CSV from S3?

2020-07-28 Thread John Smith
Also this where I find the docs confusing in the "connectors" section. File
system isn't under Data streaming but env.readCsvFile seems like it can do
the trick?

On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, 
wrote:

> Bassically I want to "monitor" a bucket on S3 and every file that gets
> created in that bucket read it and stream it.
>
> If I understand correctly, I can just use env.readCsvFile() and config to
> continuously read a folder path?
>
>
> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, 
> wrote:
>
>> Hi John,
>>
>> Do you mean you want to read S3 CSV files using partition/bucket pruning?
>>
>> If just using the DataSet API, you can use CsvInputFormat to read csv
>> files.
>>
>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>> support partitioned table. So the only way is specific the partition/bucket
>> path, and read single directory.
>>
>> In 1.11, the Table/Sql filesystem connector with csv format supports
>> partitioned table, complete support partition semantics.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jul 27, 2020 at 10:54 PM John Smith 
>> wrote:
>>
>>> Hi, using Flink 1.10
>>>
>>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>>> 2- Is there a source that can tail S3 and start reading a CSV when it is
>>> copied to S3?
>>> 3- Is that part of the table APIs?
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: How to stream CSV from S3?

2020-07-28 Thread Jingsong Li
- `env.readCsvFile` is in DataSet, just read the full amount of data once
in batch mode.
- `streamEnv.readFile(RowCsvInputFormat, filePath,
FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
directory, and continue reading in streaming mode.

On Tue, Jul 28, 2020 at 3:54 PM John Smith  wrote:

> Also this where I find the docs confusing in the "connectors" section.
> File system isn't under Data streaming but env.readCsvFile seems like it
> can do the trick?
>
> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, 
> wrote:
>
>> Bassically I want to "monitor" a bucket on S3 and every file that gets
>> created in that bucket read it and stream it.
>>
>> If I understand correctly, I can just use env.readCsvFile() and config to
>> continuously read a folder path?
>>
>>
>> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, 
>> wrote:
>>
>>> Hi John,
>>>
>>> Do you mean you want to read S3 CSV files using partition/bucket pruning?
>>>
>>> If just using the DataSet API, you can use CsvInputFormat to read csv
>>> files.
>>>
>>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>>> support partitioned table. So the only way is specific the partition/bucket
>>> path, and read single directory.
>>>
>>> In 1.11, the Table/Sql filesystem connector with csv format supports
>>> partitioned table, complete support partition semantics.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jul 27, 2020 at 10:54 PM John Smith 
>>> wrote:
>>>
 Hi, using Flink 1.10

 1- How do we go about reading CSV files that are copied to s3 buckets?
 2- Is there a source that can tail S3 and start reading a CSV when it
 is copied to S3?
 3- Is that part of the table APIs?

>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

-- 
Best, Jingsong Lee


Re: How to stream CSV from S3?

2020-07-28 Thread John Smith
Hi, is there an example on how RowCsvInputFormat is initialized?

On Tue, 28 Jul 2020 at 04:00, Jingsong Li  wrote:

> - `env.readCsvFile` is in DataSet, just read the full amount of data once
> in batch mode.
> - `streamEnv.readFile(RowCsvInputFormat, filePath,
> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
> directory, and continue reading in streaming mode.
>
> On Tue, Jul 28, 2020 at 3:54 PM John Smith  wrote:
>
>> Also this where I find the docs confusing in the "connectors" section.
>> File system isn't under Data streaming but env.readCsvFile seems like it
>> can do the trick?
>>
>> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, 
>> wrote:
>>
>>> Bassically I want to "monitor" a bucket on S3 and every file that gets
>>> created in that bucket read it and stream it.
>>>
>>> If I understand correctly, I can just use env.readCsvFile() and config
>>> to continuously read a folder path?
>>>
>>>
>>> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, 
>>> wrote:
>>>
 Hi John,

 Do you mean you want to read S3 CSV files using
 partition/bucket pruning?

 If just using the DataSet API, you can use CsvInputFormat to read csv
 files.

 If you want to use Table/Sql API, In 1.10, Csv format in table not
 support partitioned table. So the only way is specific the partition/bucket
 path, and read single directory.

 In 1.11, the Table/Sql filesystem connector with csv format supports
 partitioned table, complete support partition semantics.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

 Best,
 Jingsong

 On Mon, Jul 27, 2020 at 10:54 PM John Smith 
 wrote:

> Hi, using Flink 1.10
>
> 1- How do we go about reading CSV files that are copied to s3 buckets?
> 2- Is there a source that can tail S3 and start reading a CSV when it
> is copied to S3?
> 3- Is that part of the table APIs?
>


 --
 Best, Jingsong Lee

>>>
>
> --
> Best, Jingsong Lee
>


Re: How to stream CSV from S3?

2020-07-30 Thread Arvid Heise
Hi John,

I found an example on SO [1] in Scala.

[1] https://stackoverflow.com/a/52093079/10299342

On Tue, Jul 28, 2020 at 4:29 PM John Smith  wrote:

> Hi, is there an example on how RowCsvInputFormat is initialized?
>
> On Tue, 28 Jul 2020 at 04:00, Jingsong Li  wrote:
>
>> - `env.readCsvFile` is in DataSet, just read the full amount of data once
>> in batch mode.
>> - `streamEnv.readFile(RowCsvInputFormat, filePath,
>> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
>> directory, and continue reading in streaming mode.
>>
>> On Tue, Jul 28, 2020 at 3:54 PM John Smith 
>> wrote:
>>
>>> Also this where I find the docs confusing in the "connectors" section.
>>> File system isn't under Data streaming but env.readCsvFile seems like it
>>> can do the trick?
>>>
>>> On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, 
>>> wrote:
>>>
 Bassically I want to "monitor" a bucket on S3 and every file that gets
 created in that bucket read it and stream it.

 If I understand correctly, I can just use env.readCsvFile() and config
 to continuously read a folder path?


 On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, 
 wrote:

> Hi John,
>
> Do you mean you want to read S3 CSV files using
> partition/bucket pruning?
>
> If just using the DataSet API, you can use CsvInputFormat to read csv
> files.
>
> If you want to use Table/Sql API, In 1.10, Csv format in table not
> support partitioned table. So the only way is specific the 
> partition/bucket
> path, and read single directory.
>
> In 1.11, the Table/Sql filesystem connector with csv format supports
> partitioned table, complete support partition semantics.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>
> Best,
> Jingsong
>
> On Mon, Jul 27, 2020 at 10:54 PM John Smith 
> wrote:
>
>> Hi, using Flink 1.10
>>
>> 1- How do we go about reading CSV files that are copied to s3 buckets?
>> 2- Is there a source that can tail S3 and start reading a CSV when it
>> is copied to S3?
>> 3- Is that part of the table APIs?
>>
>
>
> --
> Best, Jingsong Lee
>

>>
>> --
>> Best, Jingsong Lee
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: How to stream CSV from S3?

2020-07-31 Thread John Smith
Hi Yes it works :)

For the Java guys...

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

String path = "file:///foo/bar";

TypeInformation[] fieldTypes = new TypeInformation[]{
  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO
};

RowCsvInputFormat csvFormat =
  new RowCsvInputFormat(
new Path(path), fieldTypes);
csvFormat.setSkipFirstLineAsHeader(true);

DataStreamSource lines = env.readFile(csvFormat, path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);

lines.map(value -> value).print();


On Thu, 30 Jul 2020 at 05:42, Arvid Heise  wrote:

> Hi John,
>
> I found an example on SO [1] in Scala.
>
> [1] https://stackoverflow.com/a/52093079/10299342
>
> On Tue, Jul 28, 2020 at 4:29 PM John Smith  wrote:
>
>> Hi, is there an example on how RowCsvInputFormat is initialized?
>>
>> On Tue, 28 Jul 2020 at 04:00, Jingsong Li  wrote:
>>
>>> - `env.readCsvFile` is in DataSet, just read the full amount of data
>>> once in batch mode.
>>> - `streamEnv.readFile(RowCsvInputFormat, filePath,
>>> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
>>> directory, and continue reading in streaming mode.
>>>
>>> On Tue, Jul 28, 2020 at 3:54 PM John Smith 
>>> wrote:
>>>
 Also this where I find the docs confusing in the "connectors" section.
 File system isn't under Data streaming but env.readCsvFile seems like it
 can do the trick?

 On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, 
 wrote:

> Bassically I want to "monitor" a bucket on S3 and every file that gets
> created in that bucket read it and stream it.
>
> If I understand correctly, I can just use env.readCsvFile() and config
> to continuously read a folder path?
>
>
> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, 
> wrote:
>
>> Hi John,
>>
>> Do you mean you want to read S3 CSV files using
>> partition/bucket pruning?
>>
>> If just using the DataSet API, you can use CsvInputFormat to read csv
>> files.
>>
>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>> support partitioned table. So the only way is specific the 
>> partition/bucket
>> path, and read single directory.
>>
>> In 1.11, the Table/Sql filesystem connector with csv format supports
>> partitioned table, complete support partition semantics.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jul 27, 2020 at 10:54 PM John Smith 
>> wrote:
>>
>>> Hi, using Flink 1.10
>>>
>>> 1- How do we go about reading CSV files that are copied to s3
>>> buckets?
>>> 2- Is there a source that can tail S3 and start reading a CSV when
>>> it is copied to S3?
>>> 3- Is that part of the table APIs?
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>