Maybe it is a good idea to remove storing the blobs in the working
directory from this FLIP and to address the problems with doing this as a
follow up. This will keep the FLIP narrowly scoped and faster to implement.
I will update the FLIP and move the blob storage part to the follow-up
section.

Cheers,
Till

On Thu, Dec 16, 2021 at 10:59 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi David,
>
> I think such an utility can be helpful. I would suggest adding something
> like this once it is needed by a component.
>
> Currently, I think only the BlobServer might be susceptible to this
> problem because we don't fsync the written bytes and then don't use an
> atomic rename operation. If we change this, then I think we should not be
> affected by this problem. For the BlobStore we have some detection
> mechanism in place that ensures that you download the correct blob using a
> MessageDigest. For the BlobCache we probably should add a check that the
> locally stored file has the same MessageDigest as expected and if not, then
> delete the file and refetch it from the BlobServer/BlobStore.
>
> The RocksDB working directory will be cleaned up with every process
> restart and the local state directory is not used across process restarts
> at the moment.
>
> Cheers,
> Till
>
> On Thu, Dec 16, 2021 at 9:13 AM David Morávek <d...@apache.org> wrote:
>
>> Hi Till,
>>
>> thanks for drafting this FLIP, I think it's really a valuable improvement.
>>
>> Agreed with Yang, that YARN / k8s implementation should be out of scope of
>> this FLIP. Just few notes on the possible integrations:
>>
>> For k8s, I think we can also benefit from this FLIP without StatefulSet.
>> If
>> the pod crashes for some reason, it will be restarted -> it's still on the
>> same node, but it looses the state. This could be addressed by attaching
>> an
>> ephemeral volume to the container [1]. This is somewhere it between the
>> current state & the persistent volume (this is where you need a
>> StatefulSet) approach, that could be expensive (depends on the
>> infrastructure).
>>
>> Example:
>>
>> apiVersion: v1
>> kind: Pod
>> metadata:
>>   name: test-pod
>> spec:
>>   containers:
>>   - image: ...
>>     name: test-container
>>     volumeMounts:
>>     - mountPath: /cache
>>       name: cache-volume
>>   volumes:
>>   - name: cache-volume
>>     emptyDir: {}
>>
>> For YARN, I don't think it's as simple as remembering prior locations. As
>> far as I remember the "restart from failure" results in a new container
>> being created and the storage is tied with a container's lifecycle and the
>> working directories are garbage collected right after the container FAILS
>> /
>> FINISHES. We'd most likely have to leverage a new component (something
>> along the lines of how the shuffle services for YARN work), that runs
>> embedded in NodeManager and allows you to externalize files for
>> out-of-the-container-lifecycle use, and that ties their lifecycle with the
>> job.
>>
>> As for the Chesnay's concern around corrupted files, are we sure that all
>> components can recover from a corrupted file? Could we for example have a
>> generic mechanism, that is reused by all the components writing to the
>> working directory (CRC + File)?
>>
>> Other than that, I really like the FLIP and looking forward to have this
>> feature in Flink +1.
>>
>> [1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/
>>
>> Best,
>> D.
>>
>> On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>
>> > I am afraid creating a dedicated StatefulSet for each TaskManager is too
>> > expensive and using a shared StatefulSet for all
>> > the TaskManagers is not flexible enough. Maybe setting a proper restart
>> > policy for TaskManager pods could benefit from
>> > this FLIP. But we might need to tackle some other issues, e.g.
>> duplicated
>> > registration, etc.
>> >
>> > All in all, this is out of the scope of this FLIP. I agree we could
>> leave
>> > it in the future FLIPs.
>> >
>> > I have no more concerns. +1
>> >
>> >
>> > Best,
>> > Yang
>> >
>> > Till Rohrmann <trohrm...@apache.org> 于2021年12月15日周三 19:06写道:
>> >
>> > > This is true. But this is not a new problem and I think that Flink
>> should
>> > > be susceptible to this problem already. One solution for this concrete
>> > case
>> > > could be that the BlobServer stores some checksums and validates the
>> file
>> > > before serving it to the TM.
>> > >
>> > > Cheers,
>> > > Till
>> > >
>> > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ches...@apache.org
>> >
>> > > wrote:
>> > >
>> > > > The issue with corrupted files is that some of them aren't read by
>> the
>> > > > component that stores them.
>> > > > For example, a file can be corrupted in the blob server of the JM,
>> but
>> > > > that it is corrupted will only be noticed by the TaskExecutor.
>> > > >
>> > > > On 15/12/2021 11:36, Till Rohrmann wrote:
>> > > > > Thanks everyone for your feedback. Let me try to address it by
>> > grouping
>> > > > > some of the individual comments:
>> > > > >
>> > > > > ### Will this feature work for native Yarn and K8s deployments?
>> > > > >
>> > > > > The working directory is an optional feature that can be used to
>> > > recover
>> > > > > additional information. You can think of it like a cache. If the
>> > > working
>> > > > > directory is there, then Flink can do certain things a bit faster
>> but
>> > > in
>> > > > > the worst case it will have to retrieve the required information
>> from
>> > > the
>> > > > > JobManager or persistent storage.
>> > > > >
>> > > > > In order to make it work with native Yarn and K8s, we would have
>> to
>> > > > change
>> > > > > these modes slightly. First of all, we would have to be able to
>> map
>> > > > working
>> > > > > directories to processes and then set a deterministic resource ids
>> > for
>> > > > the
>> > > > > processes. For K8s this could be easily achievable by using a
>> > > StatefulSet
>> > > > > as the deployment mechanism for TaskExecutors. For Yarn, we
>> probably
>> > > > would
>> > > > > have to remember the prior locations of a process. Both things are
>> > > > > potential follow ups that I don't want to tackle in this FLIP.
>> > > > >
>> > > > > If one of the modes configures the working directory to be on a
>> full
>> > or
>> > > > > broken disk, then the process will fail. I think this is not all
>> that
>> > > > > different from the current state where some things in Flink will
>> fail
>> > > if
>> > > > > they picked the wrong/full temporary directory (e.g. blob storage
>> > > > > directory).
>> > > > >
>> > > > > ### Cleanup
>> > > > >
>> > > > > The working directory will be cleaned up if the Flink process is
>> > > > gracefully
>> > > > > shut down. This means that the JobManager process will clean it
>> up if
>> > > it
>> > > > > runs in application mode and the job is terminated. SIGTERM and
>> > SIGKILL
>> > > > > signals will be treated as an ungraceful shutdown and therefore
>> they
>> > > > won't
>> > > > > clean up the working directory. This means that we probably also
>> > need a
>> > > > > graceful way for shutting TaskManager processes down in the future
>> > > > because
>> > > > > right now they are in most cases killed in order to shut them
>> down.
>> > If
>> > > > the
>> > > > > user uses the tmp directory, then any left-over working
>> directories
>> > > will
>> > > > be
>> > > > > cleaned up with the next system restart. This is somewhat similar
>> to
>> > > how
>> > > > > RocksDB's working directory is currently cleaned up as well.
>> > > > >
>> > > > > ### Corrupted files
>> > > > >
>> > > > > The working directory itself won't give you any guarantees. It
>> will
>> > be
>> > > > the
>> > > > > responsibility of the component that uses the working directory to
>> > make
>> > > > > sure that it can deal with corrupted files. E.g. if the component
>> > > cannot
>> > > > > read the file, then it should delete it and fall back to the
>> remote
>> > > > > storage/ground truth to retrieve the required information.
>> > > > >
>> > > > > I hope this could answer your questions. Let me know if you have
>> more
>> > > > > feedback.
>> > > > >
>> > > > > Cheers,
>> > > > > Till
>> > > > >
>> > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <liujiangangp...@gmail.com>
>> > wrote:
>> > > > >
>> > > > >> I like the idea. It can reuse the disk to do many things. Isn't
>> it
>> > > only
>> > > > >> for inner failover? If not, the cleaning may be a problem. Also,
>> > many
>> > > > >> resource components have their own disk schedule strategy.
>> > > > >>
>> > > > >> Chesnay Schepler <ches...@apache.org> 于2021年12月12日周日 19:59写道:
>> > > > >>
>> > > > >>> How do you intend to handle corrupted files, in particular due
>> to
>> > > > >>> process crashes during a write?
>> > > > >>> Will all writes to a cached directory append some suffix (e.g.,
>> > > > >>> ".pending") and do a rename?
>> > > > >>>
>> > > > >>> On 10/12/2021 17:54, Till Rohrmann wrote:
>> > > > >>>> Hi everyone,
>> > > > >>>>
>> > > > >>>> I would like to start a discussion about introducing an
>> explicit
>> > > > working
>> > > > >>>> directory for Flink processes that can be used to store
>> > information
>> > > > [1].
>> > > > >>>> Per default this working directory will reside in the temporary
>> > > > >>> directory
>> > > > >>>> of the node Flink runs on. However, if configured to reside on
>> a
>> > > > >>> persistent
>> > > > >>>> volume, then this information can be used to recover from
>> > > process/node
>> > > > >>>> failures. Moreover, such a working directory can be used to
>> > > > consolidate
>> > > > >>>> some of our other directories Flink creates under /tmp (e.g.
>> > > > >>> blobStorage,
>> > > > >>>> RocksDB working directory).
>> > > > >>>>
>> > > > >>>> Here is a draft PR that outlines the required changes [2].
>> > > > >>>>
>> > > > >>>> Looking forward to your feedback.
>> > > > >>>>
>> > > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
>> > > > >>>> [2] https://github.com/apache/flink/pull/18083
>> > > > >>>>
>> > > > >>>> Cheers,
>> > > > >>>> Till
>> > > > >>>>
>> > > > >>>
>> > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to