[jira] [Commented] (FLINK-19799) Make FileSource extensible

2021-04-16 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323630#comment-17323630
 ] 

Stephan Ewen commented on FLINK-19799:
--

This issue is fixed with the resolution of the subtasks.
[~lzljs3620320] please reopen this issue if we are still missing something with 
respect to the extensibility.

> Make FileSource extensible
> --
>
> Key: FLINK-19799
> URL: https://issues.apache.org/jira/browse/FLINK-19799
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0
>
>
> The File System Source currently assumes all formats can represent their work 
> units as {{FileSourceSplit}}. If that is not the case, the formats cannot be 
> implemented using the {{FileSource}}.
> We need to support extending the splits to carry additional information in 
> the splits, and to use that information when creating bulk readers and 
> handling split state.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19799) Make FileSource extensible

2021-04-16 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322926#comment-17322926
 ] 

Flink Jira Bot commented on FLINK-19799:


This issue is assigned but has not received an update in 7 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> Make FileSource extensible
> --
>
> Key: FLINK-19799
> URL: https://issues.apache.org/jira/browse/FLINK-19799
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0
>
>
> The File System Source currently assumes all formats can represent their work 
> units as {{FileSourceSplit}}. If that is not the case, the formats cannot be 
> implemented using the {{FileSource}}.
> We need to support extending the splits to carry additional information in 
> the splits, and to use that information when creating bulk readers and 
> handling split state.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19799) Make FileSource extensible

2020-10-27 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221298#comment-17221298
 ] 

Jingsong Lee commented on FLINK-19799:
--

> the checkpoint state is in general a combination of the enumerator's state 
>and the assigner's state

I think yes, it can solve the issue, now for FileSource, it is 
"{{Collection splits"}} and "{{Collection 
alreadyProcessedPaths"}}

> Make FileSource extensible
> --
>
> Key: FLINK-19799
> URL: https://issues.apache.org/jira/browse/FLINK-19799
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.12.0
>
>
> The File System Source currently assumes all formats can represent their work 
> units as {{FileSourceSplit}}. If that is not the case, the formats cannot be 
> implemented using the {{FileSource}}.
> We need to support extending the splits to carry additional information in 
> the splits, and to use that information when creating bulk readers and 
> handling split state.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19799) Make FileSource extensible

2020-10-27 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221291#comment-17221291
 ] 

Stephan Ewen commented on FLINK-19799:
--

Thanks, Steven. [~lzljs3620320] I think you mentioned something similar in a 
separate discussion.

Is it fair to say that on the FileSourceEnumerator, the checkpoint state is in 
general a combination of the enumerator's state and the assigner's state? So if 
we generify it like that, would that solve issues?

> Make FileSource extensible
> --
>
> Key: FLINK-19799
> URL: https://issues.apache.org/jira/browse/FLINK-19799
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.12.0
>
>
> The File System Source currently assumes all formats can represent their work 
> units as {{FileSourceSplit}}. If that is not the case, the formats cannot be 
> implemented using the {{FileSource}}.
> We need to support extending the splits to carry additional information in 
> the splits, and to use that information when creating bulk readers and 
> handling split state.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19799) Make FileSource extensible

2020-10-26 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221102#comment-17221102
 ] 

Steven Zhen Wu commented on FLINK-19799:


We just went through a similar exercise with the Iceberg source PoC. We want to 
make assigner pluggable (no order/locale guarantee, some ordering guarantee, 
local aware etc.). Different assigner may have different state type for 
checkpoint. That is why we have to add generic types for assigner state and 
serializer too.

We did make IcebergSource generic
{code}
public class IcebergSource,
SplitAssignerStateSerializerT extends 
SplitAssignerStateSerializer>
{code}

We simplified the construction in builder.
{code}
  public static  Builder useSimpleAssigner(TableLoader 
tableLoader) {
SimpleSplitAssignerFactory assignerFactory = new 
SimpleSplitAssignerFactory();
return new Builder<>(tableLoader, assignerFactory);
  }
{code}

The end result is still simple for users if they don't need to keep a reference 
to the IcebergSource object.
{code}
final DataStream stream = env.fromSource(
IcebergSource.useSimpleAssigner(tableLoader())
.iteratorFactory(new RowDataIteratorFactory())
.config(config)
.scanContext(scanContext)
.build(),
...
{code}

> Make FileSource extensible
> --
>
> Key: FLINK-19799
> URL: https://issues.apache.org/jira/browse/FLINK-19799
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.12.0
>
>
> The File System Source currently assumes all formats can represent their work 
> units as {{FileSourceSplit}}. If that is not the case, the formats cannot be 
> implemented using the {{FileSource}}.
> We need to support extending the splits to carry additional information in 
> the splits, and to use that information when creating bulk readers and 
> handling split state.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)