Re: [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg]

2023-11-24 Thread Feng Jin
Hi Dan

I think using Flink SQL should be able to meet your needs.

You can write a Flink Jar program. Accept different directories, schemas,
mappings, and sink tables to generate DDL and DML.

Assuming you have two directories:

directory1 -> f1, f2, f3, f4 -> iceberg1
directory2 -> f1, f2, f3 -> iceberg2

DDL can be generated roughly as follows.

CREATE TABLE s3_table1 (
  f1 varchar,
  f2 varchar,
  f3 varchar,
  f4 varchar
) with (
  'connector' = 's3://dir1'
  .
);

CREATE TABLE s3_table2 (
  f1 varchar,
  f2 varchar,
  f3 varchar
) with (
  'connector' = 's3://dir2',
 ...
);


Based on your MAPPING selection of the fields you need and then generate
DML.

INSERT INTO iceberg_catalog.iceberg_database1.tb1 SELECT
f1,f2,f3 FROM s3_table;

INSERT INTO iceberg_catalog.iceberg_database.tb12 SELECT
f11,f22 FROM s32_table;


Of course,this is my understanding of your requirements,I don't know if it
meets your scenario.


Best regards,
Feng


On Fri, Nov 24, 2023 at 3:02 AM Oxlade, Dan 
wrote:

> Thanks Feng,
>
> I think my challenge (and why I expected I’d need to use Java) is that
> there will be parquet files with different schemas landing in the s3 bucket
> - so I don’t want to hard-code the schema in a sql table definition.
>
> I’m not sure if this is even possible? Maybe I would have to write a job
> that accepts the schema, directory and iceberg target table as params and
> start instances of the job through the job api.
>
> Unless reading the parquet to a temporary table  doesn’t need the schema
> definition? I couldn't really work things out from the links.
>
> Dan
> --
> *From:* Feng Jin 
> *Sent:* Thursday, November 23, 2023 6:49:11 PM
> *To:* Oxlade, Dan 
> *Cc:* user@flink.apache.org 
> *Subject:* [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg]
>
> Hi Oxlade
>
> I think using Flink SQL can conveniently fulfill your requirements.
>
> For S3 Parquet files, you can create a temporary table using a filesystem
> connector[1] .
> For Iceberg tables, FlinkSQL can easily integrate with the Iceberg
> catalog[2].
>
> Therefore, you can use Flink SQL to export S3 files to Iceberg.
>
> If you only need field mapping or transformation, I believe using Flink
> SQL + UDF (User-Defined Functions) would be sufficient to meet your needs.
>
>
> [1].   
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching
> [nightlies.apache.org]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Dmaster_docs_connectors_table_filesystem_-23directory-2Dwatching=DwMFaQ=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg=rnrUmbL_i3hK6kK_eWoXjz-67_xsc14c1oUxQrwK75A=>
> [2].  
> https://iceberg.apache.org/docs/latest/flink-connector/#table-managed-in-hadoop-catalog
> [iceberg.apache.org]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__iceberg.apache.org_docs_latest_flink-2Dconnector_-23table-2Dmanaged-2Din-2Dhadoop-2Dcatalog=DwMFaQ=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg=gbHDXpaow809oo_go0V99A3jIkA2KMh_mINPyNBwcDs=>
>
>
> Best,
> Feng
>
>
> On Thu, Nov 23, 2023 at 11:23 PM Oxlade, Dan 
> wrote:
>
> Hi all,
>
>
>
> I’m attempting to create a POC in flink to create a pipeline to stream
> parquet to a data warehouse in iceberg format.
>
>
>
> Ideally – I’d like to watch a directory in s3 (minio locally) and stream
> those to iceberg, doing the appropriate schema mapping/translation.
>
>
>
> I guess first; does this sound like a crazy idea?
>
> Assuming not is anyone able to share examples that might get me going.
> I’ve found lots of iceberg and flink sql examples but I think I’ll need
> something in java to do the schema mapping. Also some examples reading
> parquet for s3 seem a little hard to come by.
>
>
>
> I’m aware I’ll need a catalog, I can use nessie for the prototype. I’m
> also trying to use minio to get this all working locally but this might
> just be adding complexity at the moment.
>
>
>
> TIA
>
> Dan
>
> T. Rowe Price International Ltd (registered number 3957748) is registered
> in England and Wales with its registered office at Warwick Court, 5
> Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is
> authorised and regulated by the Financial Conduct Authority. The company
> has a branch in Dubai International Financial Centre (regulated by the DFSA
> as a Representative Office).
>
> T. Rowe Price (including T. Rowe Price International Ltd and its
&g

Re: [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg]

2023-11-23 Thread Oxlade, Dan
Thanks Feng,

I think my challenge (and why I expected I’d need to use Java) is that there 
will be parquet files with different schemas landing in the s3 bucket - so I 
don’t want to hard-code the schema in a sql table definition.

I’m not sure if this is even possible? Maybe I would have to write a job that 
accepts the schema, directory and iceberg target table as params and start 
instances of the job through the job api.

Unless reading the parquet to a temporary table  doesn’t need the schema 
definition? I couldn't really work things out from the links.

Dan

From: Feng Jin 
Sent: Thursday, November 23, 2023 6:49:11 PM
To: Oxlade, Dan 
Cc: user@flink.apache.org 
Subject: [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg]

Hi Oxlade

I think using Flink SQL can conveniently fulfill your requirements.

For S3 Parquet files, you can create a temporary table using a filesystem 
connector[1] .
For Iceberg tables, FlinkSQL can easily integrate with the Iceberg catalog[2].

Therefore, you can use Flink SQL to export S3 files to Iceberg.

If you only need field mapping or transformation, I believe using Flink SQL + 
UDF (User-Defined Functions) would be sufficient to meet your needs.


[1].   
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching
 
[nightlies.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Dmaster_docs_connectors_table_filesystem_-23directory-2Dwatching=DwMFaQ=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg=rnrUmbL_i3hK6kK_eWoXjz-67_xsc14c1oUxQrwK75A=>
[2].  
https://iceberg.apache.org/docs/latest/flink-connector/#table-managed-in-hadoop-catalog
 
[iceberg.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__iceberg.apache.org_docs_latest_flink-2Dconnector_-23table-2Dmanaged-2Din-2Dhadoop-2Dcatalog=DwMFaQ=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg=gbHDXpaow809oo_go0V99A3jIkA2KMh_mINPyNBwcDs=>


Best,
Feng


On Thu, Nov 23, 2023 at 11:23 PM Oxlade, Dan 
mailto:dan.oxl...@troweprice.com>> wrote:

Hi all,



I’m attempting to create a POC in flink to create a pipeline to stream parquet 
to a data warehouse in iceberg format.



Ideally – I’d like to watch a directory in s3 (minio locally) and stream those 
to iceberg, doing the appropriate schema mapping/translation.



I guess first; does this sound like a crazy idea?

Assuming not is anyone able to share examples that might get me going. I’ve 
found lots of iceberg and flink sql examples but I think I’ll need something in 
java to do the schema mapping. Also some examples reading parquet for s3 seem a 
little hard to come by.



I’m aware I’ll need a catalog, I can use nessie for the prototype. I’m also 
trying to use minio to get this all working locally but this might just be 
adding complexity at the moment.



TIA

Dan

T. Rowe Price International Ltd (registered number 3957748) is registered in 
England and Wales with its registered office at Warwick Court, 5 Paternoster 
Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and 
regulated by the Financial Conduct Authority. The company has a branch in Dubai 
International Financial Centre (regulated by the DFSA as a Representative 
Office).

T. Rowe Price (including T. Rowe Price International Ltd and its affiliates) 
and its associates do not provide legal or tax advice. Any tax-related 
discussion contained in this e-mail, including any attachments, is not intended 
or written to be used, and cannot be used, for the purpose of (i) avoiding any 
tax penalties or (ii) promoting, marketing, or recommending to any other party 
any transaction or matter addressed herein. Please consult your independent 
legal counsel and/or professional tax advisor regarding any legal or tax issues 
raised in this e-mail.

The contents of this e-mail and any attachments are intended solely for the use 
of the named addressee(s) and may contain confidential and/or privileged 
information. Any unauthorized use, copying, disclosure, or distribution of the 
contents of this e-mail is strictly prohibited by the sender and may be 
unlawful. If you are not the intended recipient, please notify the sender 
immediately and delete this e-mail.
T. Rowe Price International Ltd (registered number 3957748) is registered in 
England and Wales with its registered office at Warwick Court, 5 Paternoster 
Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and 
regulated by the Financial Conduct Authority. The company has a branch in Dubai 
International Financial Centre (regulated by the DFSA as a Representative 
Office).

T. Rowe Price (including T. Rowe Price International Ltd and its affiliates) 

Re: flink s3[parquet] -> s3[iceberg]

2023-11-23 Thread Feng Jin
Hi Oxlade

I think using Flink SQL can conveniently fulfill your requirements.

For S3 Parquet files, you can create a temporary table using a filesystem
connector[1] .
For Iceberg tables, FlinkSQL can easily integrate with the Iceberg
catalog[2].

Therefore, you can use Flink SQL to export S3 files to Iceberg.

If you only need field mapping or transformation, I believe using Flink SQL
+ UDF (User-Defined Functions) would be sufficient to meet your needs.


[1].
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching
[2].
https://iceberg.apache.org/docs/latest/flink-connector/#table-managed-in-hadoop-catalog


Best,
Feng


On Thu, Nov 23, 2023 at 11:23 PM Oxlade, Dan 
wrote:

> Hi all,
>
>
>
> I’m attempting to create a POC in flink to create a pipeline to stream
> parquet to a data warehouse in iceberg format.
>
>
>
> Ideally – I’d like to watch a directory in s3 (minio locally) and stream
> those to iceberg, doing the appropriate schema mapping/translation.
>
>
>
> I guess first; does this sound like a crazy idea?
>
> Assuming not is anyone able to share examples that might get me going.
> I’ve found lots of iceberg and flink sql examples but I think I’ll need
> something in java to do the schema mapping. Also some examples reading
> parquet for s3 seem a little hard to come by.
>
>
>
> I’m aware I’ll need a catalog, I can use nessie for the prototype. I’m
> also trying to use minio to get this all working locally but this might
> just be adding complexity at the moment.
>
>
>
> TIA
>
> Dan
>
> T. Rowe Price International Ltd (registered number 3957748) is registered
> in England and Wales with its registered office at Warwick Court, 5
> Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is
> authorised and regulated by the Financial Conduct Authority. The company
> has a branch in Dubai International Financial Centre (regulated by the DFSA
> as a Representative Office).
>
> T. Rowe Price (including T. Rowe Price International Ltd and its
> affiliates) and its associates do not provide legal or tax advice. Any
> tax-related discussion contained in this e-mail, including any attachments,
> is not intended or written to be used, and cannot be used, for the purpose
> of (i) avoiding any tax penalties or (ii) promoting, marketing, or
> recommending to any other party any transaction or matter addressed herein.
> Please consult your independent legal counsel and/or professional tax
> advisor regarding any legal or tax issues raised in this e-mail.
>
> The contents of this e-mail and any attachments are intended solely for
> the use of the named addressee(s) and may contain confidential and/or
> privileged information. Any unauthorized use, copying, disclosure, or
> distribution of the contents of this e-mail is strictly prohibited by the
> sender and may be unlawful. If you are not the intended recipient, please
> notify the sender immediately and delete this e-mail.
>


flink s3[parquet] -> s3[iceberg]

2023-11-23 Thread Oxlade, Dan
Hi all,

I'm attempting to create a POC in flink to create a pipeline to stream parquet 
to a data warehouse in iceberg format.

Ideally - I'd like to watch a directory in s3 (minio locally) and stream those 
to iceberg, doing the appropriate schema mapping/translation.

I guess first; does this sound like a crazy idea?
Assuming not is anyone able to share examples that might get me going. I've 
found lots of iceberg and flink sql examples but I think I'll need something in 
java to do the schema mapping. Also some examples reading parquet for s3 seem a 
little hard to come by.

I'm aware I'll need a catalog, I can use nessie for the prototype. I'm also 
trying to use minio to get this all working locally but this might just be 
adding complexity at the moment.

TIA
Dan
T. Rowe Price International Ltd (registered number 3957748) is registered in 
England and Wales with its registered office at Warwick Court, 5 Paternoster 
Square, London EC4M 7DX. T. Rowe Price International Ltd is authorised and 
regulated by the Financial Conduct Authority. The company has a branch in Dubai 
International Financial Centre (regulated by the DFSA as a Representative 
Office).

T. Rowe Price (including T. Rowe Price International Ltd and its affiliates) 
and its associates do not provide legal or tax advice. Any tax-related 
discussion contained in this e-mail, including any attachments, is not intended 
or written to be used, and cannot be used, for the purpose of (i) avoiding any 
tax penalties or (ii) promoting, marketing, or recommending to any other party 
any transaction or matter addressed herein. Please consult your independent 
legal counsel and/or professional tax advisor regarding any legal or tax issues 
raised in this e-mail.

The contents of this e-mail and any attachments are intended solely for the use 
of the named addressee(s) and may contain confidential and/or privileged 
information. Any unauthorized use, copying, disclosure, or distribution of the 
contents of this e-mail is strictly prohibited by the sender and may be 
unlawful. If you are not the intended recipient, please notify the sender 
immediately and delete this e-mail.