[jira] [Created] (FLINK-32509) avoid using skip in InputStreamFSInputWrapper.seek

2023-07-01 Thread Libin Qin (Jira)
Libin Qin created FLINK-32509:
-

 Summary: avoid using skip in InputStreamFSInputWrapper.seek
 Key: FLINK-32509
 URL: https://issues.apache.org/jira/browse/FLINK-32509
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.18.0
Reporter: Libin Qin


The implementation of  InputStream does not return -1  for eof.

The java doc of InputStream said "The skip method may, for a variety of 
reasons, end up skipping over some smaller number of bytes, possibly 0." 

For FileInputStream, it allows skipping any number of bytes past the end of the 
file.

So the method "seek" of InputStreamFSInputWrapper will cause infinite loop if 
desired exceed end of file
 
I reproduced with following case
 
```java
byte[] bytes = "flink".getBytes();
try (InputStream inputStream = new ByteArrayInputStream(bytes)) {
InputStreamFSInputWrapper wrapper = new InputStreamFSInputWrapper(inputStream);
wrapper.seek(20);
}
```
I  found an issue of commons-io talks about the problem of skip
https://issues.apache.org/jira/browse/IO-203

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-01 Thread Dong Lin
Hi Piotr,

Thank you for providing further suggestions to help improve the API. Please
see my comments inline.

On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski 
wrote:

> Hey,
>
> Sorry for a late reply, I was OoO for a week. I have three things to point
> out.
>
> 1. ===
>
> The updated proposal is indeed better, but to be honest I still don't like
> it, for mostly the same reasons that I have mentioned earlier:
> - only a partial solution, that doesn't address all use cases, so we would
> need to throw it away sooner or later
> - I don't see and it hasn't been discussed how to make it work out of the
> box for all sources
> - somehow complicating API for people implementing Sources
> - it should work out of the box for most of the sources, or at least to
> have that potential in the future
>

The moments above seem kind of "abstract". I am hoping to understand more
technical details behind these comments so that we can see how to address
the concern. For example, even if a FLP does not address all use-case
(which is arguably true for every FLIP), its solution does not necessarily
need to be thrown away later as long as it is extensible. So we probably
need to understand specifically why the proposed APIs would be thrown away.

Similarly, we would need to understand if there is a better design to make
the API simpler and work out of the box etc. in order to decide how to
address these comments.


> On top of that:
> - the FLIP I think is missing how to hook up SplitEnumeratorContext and
> CheckpointCoordinator to pass "isProcessingBacklog"
>

I think it can be passed via the following function chain:
- CheckpointCoordinator invokes
OperatorCoordinatorCheckpointContext#isProcessingBacklog (via
coordinatorsToCheckpoint) to get this information.
- OperatorCoordinatorHolder implements
OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns
OperatorCoordinator#isProcessingBacklog (via coordinator)
- SourceCoordinator implements OperatorCoordinator#isProcessingBacklog and
returns SourceCoordinatorContext#isProcessingBacklog
- SourceCoordinatorContext will implement
SplitEnumeratorContext#setIsProcessingBacklog and stores the given
information in a variable.

Note that it involves only internal API. We might be able to find a simpler
solution with less functions on the path. As long as the above solution
works without having any performance or correctness, I think maybe we
should focus on the public API design and discuss the implementation in the
PR review?

- the FLIP suggests to use the long checkpointing interval as long as any
> subtask is processing the backlog. Are you sure that's the right call? What
> if other
>   sources are producing fresh records, and those fresh records are reaching
> sinks? It could happen either with disjoint JobGraph, embarrassing parallel
>   JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can
> slip using a not backpressured input channel through generally
> backpressured
>   keyBy exchange. How should we handle that? This problem I think will
> affect every solution, including my previously proposed generic one, but we
> should
>   discuss how to handle that as well.
>

Good question. Here is my plan to improve the solution in a follow-up FLIP:

- Let every subtask of every source operator emit
RecordAttributes(isBacklog=..)
- Let every subtask of every operator handle the RecordAttributes received
from inputs and emit RecordAttributes to downstream operators. Flink
runtime can derive this information for every one-input operator. For an
operator with two inputs, if one input has isBacklog=true and the other has
isBacklog=false, the operator should determine the isBacklog for its output
records based on its semantics.
- If there exists a subtask of a two-phase commit operator with
isBacklog=false, the operator should let JM know so that the JM will use
the short checkpoint interval (for data freshness). Otherwise, JM will use
the long checkpoint interval.

The above solution guarantees that, if every two-input operator has
explicitly specified their isBacklog based on the inputs' isBacklog, then
the JM will use the short checkpoint interval if and only if it is useful
for at least one subtask of one two-phase commit operator.

Note that even the above solution might not be perfect. Suppose there
exists one subtask of the two-phase commit operator has isBacklog=false,
but every other subtasks of this operator has isBacklog=true, due to load
imbalance. In this case, it might be beneficial to use the long checkpoint
interval to improve the average data freshness for this operator. However,
as we get into more edge case, the solution will become more complicated
(e.g. providing more APIs for user to specify their intended strategy) and
there will be less additional benefits (because these scenarios are less
common).

Also, note that we can support the solution described above without
throwing away any public API currently proposed in FL

Re: [VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-01 Thread Dong Lin
Hi Chesnay, can you put your comments in the discussion thread, so that we
can continue the technical discussion there?


Re: [VOTE] FLIP-321: introduce an API deprecation process

2023-07-01 Thread Dong Lin
Thanks for the FLIP.

+1 (binding)

On Fri, Jun 30, 2023 at 5:39 PM Becket Qin  wrote:

> Hi folks,
>
> I'd like to start the VOTE for FLIP-321[1] which proposes to introduce an
> API deprecation process to Flink. The discussion thread for the FLIP can be
> found here[2].
>
> The vote will be open until at least July 4, following the consensus voting
> process.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process
> [2] https://lists.apache.org/thread/vmhzv8fcw2b33pqxp43486owrxbkd5x9
>


Re: [VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-01 Thread Dong Lin
Hi Chesnay,

Thank you for your comments and I would be happy to discuss together to
find a solution.

I just want to note that the discussion thread for this FLIP has been open
for almost two months for everyone to leave comments. I will really
appreciate it if in the future you can help provide comments earlier in the
discussion thread so that I (and probably other contributors) can have the
chance to address your concern and achieve consensus sooner than later. I
am hoping we can be more considerate and help each other in the community
be more productive.

Thanks,
Dong

On Fri, Jun 30, 2023 at 11:18 PM Chesnay Schepler 
wrote:

> -1 (binding)
>
> I feel like this FLIP needs a bit more time in the oven.
>
> It seems to be very light on actual details; you can summarize the
> entire changes section as "The enumerator calls this method and then
> another checkpoint interval is used."
> I would love to know how this is wired into the triggering of
> checkpoints, what the behavior is with multiple sources, if a sink is
> allowed to set this at any point or just once, what the semantics of a
> "backlog" are for sources other than Hybrid/ MySQL CDC (because catching
> up after a failover is a common enough pattern), whether/how this
> information could also be interesting for the scheduler (because we may
> want to avoid rescalings during the backlog processing), whether the
> backlog processing should be exposed as a metric for users (or for that
> matter, how we inform users at all that we're using a different
> checkpoint interval at this time).
>
> Following my discussion with Piotr and Stefan I'm also not sure how
> future-proof the proposed API really is. Already I feel like the name
> "setIsProcessingBacklog()" is rather specific for the state of the
> source (making it technically wrong to call it in other situations like
> being backpressured (again, depending on what "backlog processing" even
> means)), while not being clear on what this actually results in. The
> javadocs don't even mention the checkpointing interval at all, but
> instead reference downstream optimizations that, afaict, aren't
> mentioned in the FLIP.
>
> I'd be very hesitant with marking it as public from the get-go. Ideally
> it would maybe even be added as a separate interface (somehow).
>
> On 30/06/2023 16:37, Piotr Nowojski wrote:
> > Hey,
> >
> > Sorry to disturb this voting, but after discussing this thoroughly with
> > Chesnay and Stefan Richter I have to vote:
> >   -1 (binding)
> > mainly to suspend the current voting thread. Please take a look at my
> mail
> > at dev mailing list.
> >
> > Best,
> > Piotrek
> >
> > czw., 29 cze 2023 o 14:59 feng xiangyu 
> napisał(a):
> >
> >> +1 (non-binding)
> >>
> >> Best,
> >> Xiangyu
> >>
> >> yuxia  于2023年6月29日周四 20:44写道:
> >>
> >>> +1 (binding)
> >>>
> >>> Best regards,
> >>> Yuxia
> >>>
> >>> - 原始邮件 -
> >>> 发件人: "Yuepeng Pan" 
> >>> 收件人: "dev" 
> >>> 发送时间: 星期四, 2023年 6 月 29日 下午 8:21:14
> >>> 主题: Re: [VOTE] FLIP-309: Support using larger checkpointing interval
> when
> >>> source is processing backlog
> >>>
> >>> +1  non-binding.
> >>>
> >>>
> >>> Best.
> >>> Yuepeng Pan
> >>>
> >>>
> >>>  Replied Message 
> >>> | From | Jingsong Li |
> >>> | Date | 06/29/2023 13:25 |
> >>> | To | dev |
> >>> | Cc | flink.zhouyunfeng |
> >>> | Subject | Re: [VOTE] FLIP-309: Support using larger checkpointing
> >>> interval when source is processing backlog |
> >>> +1 binding
> >>>
> >>> On Thu, Jun 29, 2023 at 11:03 AM Dong Lin  wrote:
>  Hi all,
> 
>  We would like to start the vote for FLIP-309: Support using larger
>  checkpointing interval when source is processing backlog [1]. This
> FLIP
> >>> was
>  discussed in this thread [2].
> 
>  Flink 1.18 release will feature freeze on July 11. We hope to make
> this
>  feature available in Flink 1.18.
> 
>  The vote will be open until at least July 4th (at least 72 hours),
> >>> following
>  the consensus voting process.
> 
>  Cheers,
>  Yunfeng and Dong
> 
>  [1]
> 
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>  [2] https://lists.apache.org/thread/l1l7f30h7zldjp6ow97y70dcthx7tl37
>
>
>