Hi Ryan,

In my experience the different engines often have different ways to
distribute the authentication information. Some engines use IAM roles to
access S3, some engines uses security tokens to authenticate, which means,
we already have a different Catalog/FileIO configurations on engine level.
Sometimes this even changes on application level. S3FileIO and HadoopFileIO
are also interchangeable in case of S3 (our documentation even states that).

The proposed FlinkFileSystemIO is basically a wrapper around the Hadoop
FileSystem (the implementation is the same as HadoopFileIO, just using the
same classes from different packages). The user can switch between the
other FileIO implementations and this without any other changes. The only
difference/goal here is to provide a mechanism to distribute the
authentication information. It has a very important feature: *Delegation
token support*. So in case of a job starts with 1000 writers, only a single
call is required to Kerberos, instead of 1000 calls.

I don't see how this could be archived in an engine agnostic way in both
HadoopFileIO and S3FileIO.

On Wed, Apr 24, 2024, 00:14 Ryan Blue <b...@tabular.io> wrote:

> What I mean is that the FileIO implementation is determined by the catalog
> and the table, not by the engine. The engine should be able to use any
> valid FileIO implementation.
>
> On Tue, Apr 23, 2024 at 2:31 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Hi Ryan,
>>
>> The intended use of the *FlinkFileSystemIO* is to set it through the
>> *Catalog*, like this:
>>
>>
>> *        Map<String, String> props = new HashMap<>(3);*
>>
>>
>>
>> *        props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
>>   props.put(CatalogProperties.URI, uri);
>> props.put(CatalogProperties.FILE_IO_IMPL, FlinkFileIO.class.getName());*
>> *        CatalogLoader.hive("hive", hiveConf, props);*
>>
>> If I understand you correctly, this is exactly what you are suggesting. I
>> absolutely agree that setting this through the TableProperties is a no-go.
>> This should be used for Flink applications where the storage layer behind
>> the Iceberg table is also used for other purposes, like checkpointing.
>>
>> Thanks,
>> Peter
>>
>> Ryan Blue <b...@tabular.io> ezt írta (időpont: 2024. ápr. 22., H, 23:58):
>>
>>> I think the idea of introducing a Flink-specific FileIO isn't a good
>>> idea. The intent of the Java API is for a table to use the FileIO instance
>>> that is supplied by the table object. That puts the responsibility for
>>> supplying a correctly configure FileIO on the catalog, which is the right
>>> place to inject most customization.
>>>
>>> Having a Flink FileIO doesn't fit with that model. You could expose a
>>> generic FileIO from the catalog if you wanted, which would make a lot more
>>> sense. But FileIO is not the same thing as a FileSystem implementation. It
>>> should be used in places where it makes sense to use FileIO and should not
>>> be given the same lifecycle or responsibilities as a FileSystem.
>>>
>>> Ryan
>>>
>>> On Mon, Apr 22, 2024 at 11:00 AM Ferenc Csaky <ferenc.cs...@pm.me.invalid>
>>> wrote:
>>>
>>>> Hi Peter,
>>>>
>>>> I am coming from the Flink side, but at Cloudera we also use
>>>> Iceberg as well.
>>>>
>>>> Utilizing the Flink delegation token fw via the Iceberg Java API
>>>> would be great. I think that simplifying the configuration for
>>>> Flink related cases also has value on its own, and could help to
>>>> eliminate some confusion regarding when/where set properties.
>>>>
>>>> Regarding the naming, maybe it would worth to be more explicit and
>>>> call the class `FlinkFSFileIO`? Just to emphasize that "Flink" in
>>>> this context is referred as the FS abstraction layer, not as a
>>>> processing engine. WDYT?
>>>>
>>>> Looking forward to this change!
>>>>
>>>> Best,
>>>> Ferenc
>>>>
>>>> On 2024/04/19 17:08:23 Péter Váry wrote:
>>>> > Hi Iceberg Team,
>>>> >
>>>> > Flink has its own FileSystem implementation. See:
>>>> >
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/overview/
>>>> > .
>>>> > This FileSystem already has several implementations:
>>>> >
>>>> >    - Hadoop
>>>> >    - Azure
>>>> >    - S3
>>>> >    - Google Cloud Storage
>>>> >    - ...
>>>> >
>>>> > As a general rule in Flink, one should use this FileSystem to consume
>>>> and
>>>> > persistently store data.
>>>> > If these FileSystems are configured, then Flink makes sure that the
>>>> > configurations are consistent and available for the JM/TM.
>>>> > Also as an added benefit, delegation tokens are handled and
>>>> distributed for
>>>> > these FileSystems automatically. See:
>>>> >
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-delegation-token/
>>>> >
>>>> > In house, some of our new users are struggling with parametrizing
>>>> > HadooFileIO, and S3FileIO for Iceberg, trying to wrap their head
>>>> around
>>>> > that they have to provide different configurations for the
>>>> checkpointing
>>>> > and for the Iceberg table storage (even if they are stored in the same
>>>> > bucket, or on the same HDFS cluster)
>>>> >
>>>> > I have created a PR, which provides a FileIO implementation which uses
>>>> > FlinkFileSystem. Very imaginatively I have named it FlinkFileIO. See:
>>>> > https://github.com/apache/iceberg/pull/10151
>>>> >
>>>> > This would allow the users to configure the FileSystem only once, and
>>>> use
>>>> > this FileSystem to access Iceberg tables. Also, if for whatever
>>>> reason the
>>>> > global nature of flink file system config is limiting, the users still
>>>> > could revert back using the other FileIO implementations.
>>>> >
>>>> > What do you think? Would this be a useful addition to the
>>>> Iceberg-Flink
>>>> > integration?
>>>> >
>>>> > Thanks,
>>>> > Peter
>>>> >
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Tabular
>>>
>>
>
> --
> Ryan Blue
> Tabular
>

Reply via email to