[ 
https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221102#comment-17221102
 ] 

Steven Zhen Wu edited comment on FLINK-19799 at 10/27/20, 3:17 AM:
-------------------------------------------------------------------

We just went through a similar exercise with the [Iceberg source 
PoC|https://github.com/stevenzwu/iceberg/pull/2/files]. We want to make 
assigner pluggable (no order/locale guarantee, some ordering guarantee, local 
aware etc.). Different assigner may have different state type for checkpoint. 
That is why we have to add generic types for assigner state and serializer too.

We did make IcebergSource generic
{code}
public class IcebergSource<
    T,
    SplitAssignerStateT extends SplitAssignerState,
    SplitAssignerT extends SplitAssigner<SplitAssignerStateT>,
    SplitAssignerStateSerializerT extends 
SplitAssignerStateSerializer<SplitAssignerStateT>>
{code}

We simplified the construction in builder.
{code}
  public static <T> Builder<T, SimpleSplitAssignerState, SimpleSplitAssigner,
      SimpleSplitAssignerStateSerializer> useSimpleAssigner(TableLoader 
tableLoader) {
    SimpleSplitAssignerFactory assignerFactory = new 
SimpleSplitAssignerFactory();
    return new Builder<>(tableLoader, assignerFactory);
  }
{code}

The end result is still simple for users if they don't need to keep a reference 
to the IcebergSource object.
{code}
    final DataStream<RowData> stream = env.fromSource(
        IcebergSource.<RowData>useSimpleAssigner(tableLoader())
            .iteratorFactory(new RowDataIteratorFactory())
            .config(config)
            .scanContext(scanContext)
        .build(),
    ...
{code}


was (Author: stevenz3wu):
We just went through a similar exercise with the [Iceberg source 
PoC|https://github.com/stevenzwu/iceberg/pull/2/files]. We want to make 
assigner pluggable (no order/locale guarantee, some ordering guarantee, local 
aware etc.). Different assigner may have different state type for checkpoint. 
That is why we have to add generic types for assigner state and serializer too.

We did make IcebergSource generic
{code}
public class IcebergSource<T,
    SplitAssignerStateT extends SplitAssignerState,
    SplitAssignerT extends SplitAssigner<SplitAssignerStateT>,
    SplitAssignerStateSerializerT extends 
SplitAssignerStateSerializer<SplitAssignerStateT>>
{code}

We simplified the construction in builder.
{code}
  public static <T> Builder<T, SimpleSplitAssignerState, SimpleSplitAssigner,
      SimpleSplitAssignerStateSerializer> useSimpleAssigner(TableLoader 
tableLoader) {
    SimpleSplitAssignerFactory assignerFactory = new 
SimpleSplitAssignerFactory();
    return new Builder<>(tableLoader, assignerFactory);
  }
{code}

The end result is still simple for users if they don't need to keep a reference 
to the IcebergSource object.
{code}
    final DataStream<RowData> stream = env.fromSource(
        IcebergSource.<RowData>useSimpleAssigner(tableLoader())
            .iteratorFactory(new RowDataIteratorFactory())
            .config(config)
            .scanContext(scanContext)
        .build(),
    ...
{code}

> Make FileSource extensible
> --------------------------
>
>                 Key: FLINK-19799
>                 URL: https://issues.apache.org/jira/browse/FLINK-19799
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Major
>             Fix For: 1.12.0
>
>
> The File System Source currently assumes all formats can represent their work 
> units as {{FileSourceSplit}}. If that is not the case, the formats cannot be 
> implemented using the {{FileSource}}.
> We need to support extending the splits to carry additional information in 
> the splits, and to use that information when creating bulk readers and 
> handling split state.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to