[
https://issues.apache.org/jira/browse/FLINK-17899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephan Ewen updated FLINK-17899:
---------------------------------
Description:
*Preambel:* This whole discussion is to some extend only necessary, because in
the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the
{{pollNext(...)}} method. However, this design follows some deeper runtime
pipeline design, and is not easy to change at this stage.
There are some principle design choices here:
*(1) Do we make Timestamps and Watermarks purely a feature of the library
(ConnectorBase), or do we integrate it with the core (SourceOperator).*
Making it purely a responsibility of the ConnectorBase would have the advantage
of keeping the SourceOperator simple. However, there is value in integrating
this with the SourceOperator.
- Implementations that are not using the ConnectorBase (like simple
collection- or iterator-based sources) would automatically get access to the
plug-able TimestampExtractors and WatermarkGenerators.
- When executing batch programs, the SourceOperator can transparently inject a
"no-op" WatermarkGenerator so make sure no Watermarks are generated during the
batch execution. Given that batch sources are very performance sensitive, it
seems useful to not even run the watermark generator logic, rather than later
dropping the watermarks.
- In a future version, we may want to implement "global watermark holds"
generated my the Enumerators: The enumerator tells the readers how far they may
advance their local watermarks. This can help to not prematurely advance the
watermark based on a split's records when other splits have data overlapping
with older ranges. An example where this is commonly the case is the streaming
file source.
*(2) Is the per-partition watermarking purely a feature of the library
(ConnectorBase), or do we integrate it with the core (SourceOperator).*
I believe we need to solve this on the same level as the previous question:
- Once a connector instantiates the per-partition watermark generators, the
main output (through which the SourceReader emits the records) must not run its
watermark generator any more. Otherwise we extract watermarks also on the
merged stream, which messes things up. So having the per-partition watermark
generators simply in the ConnectorBase and emit transparently through an
unchanged main output would not work.
- So, if we decide to implement watermarks support in the core
(SourceOperator), we would need to offer the per-partition watermarking
utilities on that level as well.
- Along a similar line of thoughts as in the previous point, the batch
execution can optimize the watermark extraction by supplying no-op extractors
also for the per-partition extractors (which will most likely bear the bulk of
the load in the connectors).
*(3) How would an integration of WatermarkGenerators with the SourceOperator
look like?*
Rather straightforward, the SourceOperator instantiates a SourceOutput that
internally runs the timestamp extractor and watermark generator and emits to
the DataOutput that the operator emits to.
*(4) How would an integration of the per-split WatermarkGenerators look like?*
I would propose to introduce a class {{ReaderMainOutput}} which extends
{{SourceOutput}} and. The {{SourceReader}} should accept a {{ReaderMainOutput}}
instead of a {{SourceOutput}}.
{code:java}
public interface ReaderMainOutput<T> extends SourceOutput<T> {
@Override
void collect(T record);
@Override
void collect(T record, long timestamp);
SourceOutput<T> createOutputForSplit(String splitId);
void releaseOutputForSplit(String splitId);
}
{code}
was:
*Preambel:* This whole discussion is to some extend only necessary, because in
the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the
{{pollNext(...)}} method. However, this design follows some deeper runtime
pipeline design, and is not easy to change at this stage.
There are some principle design choices here:
*(1) Do we make Timestamps and Watermarks purely a feature of the library
(ConnectorBase), or do we integrate it with the core (SourceOperator).*
Making it purely a responsibility of the ConnectorBase would have the advantage
of keeping the SourceOperator simple. However, there is value in integrating
this with the SourceOperator.
- Implementations that are not using the ConnectorBase (like simple
collection- or iterator-based sources) would automatically get access to the
plug-able TimestampExtractors and WatermarkGenerators.
- When executing batch programs, the SourceOperator can transparently inject a
"no-op" WatermarkGenerator so make sure no Watermarks are generated during the
batch execution. Given that batch sources are very performance sensitive, it
seems useful to not even run the watermark generator logic, rather than later
dropping the watermarks.
- In a future version, we may want to implement "global watermark holds"
generated my the Enumerators: The enumerator tells the readers how far they may
advance their local watermarks. This can help to not prematurely advance the
watermark based on a split's records when other splits have data overlapping
with older ranges. An example where this is commonly the case is the streaming
file source.
*(2) Is the per-partition watermarking purely a feature of the library
(ConnectorBase), or do we integrate it with the core (SourceOperator).*
I believe we need to solve this on the same level as the previous question:
- Once a connector instantiates the per-partition watermark generators, the
main output (through which the SourceReader emits the records) must not run its
watermark generator any more. Otherwise we extract watermarks also on the
merged stream, which messes things up. So having the per-partition watermark
generators simply in the ConnectorBase and emit transparently through an
unchanged main output would not work.
- So, if we decide to implement watermarks support in the core
(SourceOperator), we would need to offer the per-partition watermarking
utilities on that level as well.
- Along a similar line of thoughts as in the previous point, the batch
execution can optimize the watermark extraction by supplying no-op extractors
also for the per-partition extractors (which will most likely bear the bulk of
the load in the connectors).
*(3) How would an integration of WatermarkGenerators with the SourceOperator
look like?*
Rather straightforward, the SourceOperator instantiates a SourceOutput that
internally runs the timestamp extractor and watermark generator and emits to
the DataOutput that the operator emits to.
*(4) How would an integration of the per-split WatermarkGenerators look like?*
I would propose to add a method to introduce a class {{ReaderMainOutput}} which
extends {{SourceOutput}} and. The {{SourceReader}} should accept a
{{ReaderMainOutput}} instead of a {{SourceOutput}}.
{code:java}
public interface ReaderMainOutput<T> extends SourceOutput<T> {
@Override
void collect(T record);
@Override
void collect(T record, long timestamp);
SourceOutput<T> createOutputForSplit(String splitId);
void releaseOutputForSplit(String splitId);
}
{code}
> Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources
> -------------------------------------------------------------------
>
> Key: FLINK-17899
> URL: https://issues.apache.org/jira/browse/FLINK-17899
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Major
>
> *Preambel:* This whole discussion is to some extend only necessary, because
> in the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the
> {{pollNext(...)}} method. However, this design follows some deeper runtime
> pipeline design, and is not easy to change at this stage.
>
> There are some principle design choices here:
>
> *(1) Do we make Timestamps and Watermarks purely a feature of the library
> (ConnectorBase), or do we integrate it with the core (SourceOperator).*
> Making it purely a responsibility of the ConnectorBase would have the
> advantage of keeping the SourceOperator simple. However, there is value in
> integrating this with the SourceOperator.
> - Implementations that are not using the ConnectorBase (like simple
> collection- or iterator-based sources) would automatically get access to the
> plug-able TimestampExtractors and WatermarkGenerators.
> - When executing batch programs, the SourceOperator can transparently inject
> a "no-op" WatermarkGenerator so make sure no Watermarks are generated during
> the batch execution. Given that batch sources are very performance sensitive,
> it seems useful to not even run the watermark generator logic, rather than
> later dropping the watermarks.
> - In a future version, we may want to implement "global watermark holds"
> generated my the Enumerators: The enumerator tells the readers how far they
> may advance their local watermarks. This can help to not prematurely advance
> the watermark based on a split's records when other splits have data
> overlapping with older ranges. An example where this is commonly the case is
> the streaming file source.
>
> *(2) Is the per-partition watermarking purely a feature of the library
> (ConnectorBase), or do we integrate it with the core (SourceOperator).*
> I believe we need to solve this on the same level as the previous question:
> - Once a connector instantiates the per-partition watermark generators, the
> main output (through which the SourceReader emits the records) must not run
> its watermark generator any more. Otherwise we extract watermarks also on the
> merged stream, which messes things up. So having the per-partition watermark
> generators simply in the ConnectorBase and emit transparently through an
> unchanged main output would not work.
> - So, if we decide to implement watermarks support in the core
> (SourceOperator), we would need to offer the per-partition watermarking
> utilities on that level as well.
> - Along a similar line of thoughts as in the previous point, the batch
> execution can optimize the watermark extraction by supplying no-op extractors
> also for the per-partition extractors (which will most likely bear the bulk
> of the load in the connectors).
>
> *(3) How would an integration of WatermarkGenerators with the SourceOperator
> look like?*
> Rather straightforward, the SourceOperator instantiates a SourceOutput that
> internally runs the timestamp extractor and watermark generator and emits to
> the DataOutput that the operator emits to.
>
> *(4) How would an integration of the per-split WatermarkGenerators look like?*
> I would propose to introduce a class {{ReaderMainOutput}} which extends
> {{SourceOutput}} and. The {{SourceReader}} should accept a
> {{ReaderMainOutput}} instead of a {{SourceOutput}}.
>
> {code:java}
> public interface ReaderMainOutput<T> extends SourceOutput<T> {
> @Override
> void collect(T record);
> @Override
> void collect(T record, long timestamp);
> SourceOutput<T> createOutputForSplit(String splitId);
> void releaseOutputForSplit(String splitId);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)