[jira] [Commented] (FLINK-19799) Make FileSource extensible
[ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323630#comment-17323630 ] Stephan Ewen commented on FLINK-19799: -- This issue is fixed with the resolution of the subtasks. [~lzljs3620320] please reopen this issue if we are still missing something with respect to the extensibility. > Make FileSource extensible > -- > > Key: FLINK-19799 > URL: https://issues.apache.org/jira/browse/FLINK-19799 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > > The File System Source currently assumes all formats can represent their work > units as {{FileSourceSplit}}. If that is not the case, the formats cannot be > implemented using the {{FileSource}}. > We need to support extending the splits to carry additional information in > the splits, and to use that information when creating bulk readers and > handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19799) Make FileSource extensible
[ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322926#comment-17322926 ] Flink Jira Bot commented on FLINK-19799: This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > Make FileSource extensible > -- > > Key: FLINK-19799 > URL: https://issues.apache.org/jira/browse/FLINK-19799 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > > The File System Source currently assumes all formats can represent their work > units as {{FileSourceSplit}}. If that is not the case, the formats cannot be > implemented using the {{FileSource}}. > We need to support extending the splits to carry additional information in > the splits, and to use that information when creating bulk readers and > handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19799) Make FileSource extensible
[ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221298#comment-17221298 ] Jingsong Lee commented on FLINK-19799: -- > the checkpoint state is in general a combination of the enumerator's state >and the assigner's state I think yes, it can solve the issue, now for FileSource, it is "{{Collection splits"}} and "{{Collection alreadyProcessedPaths"}} > Make FileSource extensible > -- > > Key: FLINK-19799 > URL: https://issues.apache.org/jira/browse/FLINK-19799 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.12.0 > > > The File System Source currently assumes all formats can represent their work > units as {{FileSourceSplit}}. If that is not the case, the formats cannot be > implemented using the {{FileSource}}. > We need to support extending the splits to carry additional information in > the splits, and to use that information when creating bulk readers and > handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19799) Make FileSource extensible
[ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221291#comment-17221291 ] Stephan Ewen commented on FLINK-19799: -- Thanks, Steven. [~lzljs3620320] I think you mentioned something similar in a separate discussion. Is it fair to say that on the FileSourceEnumerator, the checkpoint state is in general a combination of the enumerator's state and the assigner's state? So if we generify it like that, would that solve issues? > Make FileSource extensible > -- > > Key: FLINK-19799 > URL: https://issues.apache.org/jira/browse/FLINK-19799 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.12.0 > > > The File System Source currently assumes all formats can represent their work > units as {{FileSourceSplit}}. If that is not the case, the formats cannot be > implemented using the {{FileSource}}. > We need to support extending the splits to carry additional information in > the splits, and to use that information when creating bulk readers and > handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19799) Make FileSource extensible
[ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221102#comment-17221102 ] Steven Zhen Wu commented on FLINK-19799: We just went through a similar exercise with the Iceberg source PoC. We want to make assigner pluggable (no order/locale guarantee, some ordering guarantee, local aware etc.). Different assigner may have different state type for checkpoint. That is why we have to add generic types for assigner state and serializer too. We did make IcebergSource generic {code} public class IcebergSource, SplitAssignerStateSerializerT extends SplitAssignerStateSerializer> {code} We simplified the construction in builder. {code} public static Builder useSimpleAssigner(TableLoader tableLoader) { SimpleSplitAssignerFactory assignerFactory = new SimpleSplitAssignerFactory(); return new Builder<>(tableLoader, assignerFactory); } {code} The end result is still simple for users if they don't need to keep a reference to the IcebergSource object. {code} final DataStream stream = env.fromSource( IcebergSource.useSimpleAssigner(tableLoader()) .iteratorFactory(new RowDataIteratorFactory()) .config(config) .scanContext(scanContext) .build(), ... {code} > Make FileSource extensible > -- > > Key: FLINK-19799 > URL: https://issues.apache.org/jira/browse/FLINK-19799 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.12.0 > > > The File System Source currently assumes all formats can represent their work > units as {{FileSourceSplit}}. If that is not the case, the formats cannot be > implemented using the {{FileSource}}. > We need to support extending the splits to carry additional information in > the splits, and to use that information when creating bulk readers and > handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)