Thanks, makes sense. What about using the config mechanism? We're collecting and distributing some environment variables at startup, would it also work to include a timestamp with that?
Also, would you be interested in a patch to note the caveat about union state metadata in the documentation? Jacob On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann <[email protected]> wrote: > Did I understand you correctly that you use the union state to synchronize > the per partition state across all operators in order to obtain a global > overview? If this is the case, then this will only work in case of a > failover. Only then, all operators are being restarted with the union of > all operators state. If the job would never fail, then there would never be > an exchange of state. > > If you really need a global view over your data, then you need to create > an operator with a parallelism of 1 which records all the different > timestamps. > > Another idea could be to use the broadcast state pattern [1]. You could > have an operator which extracts the java.time.Instant and outputs them as a > side output and simply forwards the records on the main output. Then you > could use the side output as the broadcast input and the main output as the > normal input into the broadcast operator. The problem with this approach > might be that you don't get order guarantees between the side and the main > output. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html > <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8&s=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY&e=> > > Cheers, > Till > > On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart <[email protected]> wrote: > >> Thanks! That would do it. I've disabled the operator for now. >> >> The purpose was to know the age of the job's state, so that we could >> consider its output in terms of how much context it knows. Regular state >> seemed insufficient because partitions might see their first traffic at >> different times. >> >> How would you go about implementing something like that? >> >> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann <[email protected]> >> wrote: >> >>> Hi Jacob, >>> >>> I think you are running into some deficiencies of Flink's union state >>> here. The problem is that for every entry in your list state, Flink stores >>> a separate offset (a long value). The reason for this behaviour is that we >>> use the same state implementation for the union state as well as for the >>> split state. For the latter, the offset information is required to split >>> the state in case of changing the parallelism of your job. >>> >>> My recommendation would be to try to get rid of union state all >>> together. The union state has primarily been introduced to checkpoint some >>> source implementations and might become deprecated due to performance >>> problems once these sources can be checkpointed differently. >>> >>> Cheers, >>> Till >>> >>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart <[email protected]> wrote: >>> >>>> Oh, I should clarify that's 43MB per partition, so with 48 partitions >>>> it explains my 2GB. >>>> >>>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart <[email protected]> wrote: >>>> >>>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I >>>>> found something: >>>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value >>>>> *weights >>>>> 43MB (5.3 million longs). >>>>> >>>>> "startup-times" is an operator state of mine (union list of >>>>> java.time.Instant). I see a way to end up fewer items in the list, but I'm >>>>> not sure how the actual size is related to the number of offsets. Can you >>>>> elaborate on that? >>>>> >>>>> Incidentally, 42.5MB is the number I got out of >>>>> https://issues.apache.org/jira/browse/FLINK-14618 >>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14618&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=3KZriZyZgBj7mReI9Giq9_Y59NZ6d_4KGE1RkGm2DCI&s=I6LhM2g2btCo31K3ox7TZhtHQiee95biqJf7Hbj9Dbo&e=>. >>>>> So I think my two problems are closely related. >>>>> >>>>> Jacob >>>>> >>>>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> As Gordon said, the metadata will contain the ByteStreamStateHandle, >>>>>> when writing out the ByteStreamStateHandle, will write out the handle >>>>>> name >>>>>> -- which is a path(as you saw). The ByteStreamStateHandle will be created >>>>>> when state size is small than `state.backend.fs.memory-threshold`(default >>>>>> is 1024). >>>>>> >>>>>> If you want to verify this, you can ref the unit test >>>>>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the >>>>>> metadata, you can find out that there are many `ByteStreamStateHandle`, >>>>>> and >>>>>> their names are the strings you saw in the metadata. >>>>>> >>>>>> Best, >>>>>> Congxian >>>>>> >>>>>> >>>>>> Jacob Sevart <[email protected]> 于2020年3月6日周五 上午3:57写道: >>>>>> >>>>>>> Thanks, I will monitor that thread. >>>>>>> >>>>>>> I'm having a hard time following the serialization code, but if you >>>>>>> know anything about the layout, tell me if this makes sense. What I see >>>>>>> in >>>>>>> the hex editor is, first, many HDFS paths. Then gigabytes of unreadable >>>>>>> data. Then finally another HDFS path at the end. >>>>>>> >>>>>>> If it is putting state in there, under normal circumstances, does it >>>>>>> make sense that it would be interleaved with metadata? I would expect >>>>>>> all >>>>>>> the metadata to come first, and then state. >>>>>>> >>>>>>> Jacob >>>>>>> >>>>>>> >>>>>>> >>>>>>> Jacob >>>>>>> >>>>>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jacob, >>>>>>>> >>>>>>>> As I said previously I am not 100% sure what can be causing this >>>>>>>> behavior, but this is a related thread here: >>>>>>>> >>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e= >>>>>>>> >>>>>>>> Which you can re-post your problem and monitor for answers. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Kostas >>>>>>>> >>>>>>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart <[email protected]> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > Kostas and Gordon, >>>>>>>> > >>>>>>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that >>>>>>>> setting configured so it should be at the default 1024b. This is the >>>>>>>> full >>>>>>>> "state.*" section showing in the JobManager UI. >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > Jacob >>>>>>>> > >>>>>>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai < >>>>>>>> [email protected]> wrote: >>>>>>>> >> >>>>>>>> >> Hi Jacob, >>>>>>>> >> >>>>>>>> >> Apart from what Klou already mentioned, one slightly possible >>>>>>>> reason: >>>>>>>> >> >>>>>>>> >> If you are using the FsStateBackend, it is also possible that >>>>>>>> your state is small enough to be considered to be stored inline within >>>>>>>> the >>>>>>>> metadata file. >>>>>>>> >> That is governed by the "state.backend.fs.memory-threshold" >>>>>>>> configuration, with a default value of 1024 bytes, or can also be >>>>>>>> configured with the `fileStateSizeThreshold` argument when >>>>>>>> constructing the >>>>>>>> `FsStateBackend`. >>>>>>>> >> The purpose of that threshold is to ensure that the backend does >>>>>>>> not create a large amount of very small files, where potentially the >>>>>>>> file >>>>>>>> pointers are actually larger than the state itself. >>>>>>>> >> >>>>>>>> >> Cheers, >>>>>>>> >> Gordon >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas < >>>>>>>> [email protected]> wrote: >>>>>>>> >>> >>>>>>>> >>> Hi Jacob, >>>>>>>> >>> >>>>>>>> >>> Could you specify which StateBackend you are using? >>>>>>>> >>> >>>>>>>> >>> The reason I am asking is that, from the documentation in [1]: >>>>>>>> >>> >>>>>>>> >>> "Note that if you use the MemoryStateBackend, metadata and >>>>>>>> savepoint >>>>>>>> >>> state will be stored in the _metadata file. Since it is >>>>>>>> >>> self-contained, you may move the file and restore from any >>>>>>>> location." >>>>>>>> >>> >>>>>>>> >>> I am also cc'ing Gordon who may know a bit more about state >>>>>>>> formats. >>>>>>>> >>> >>>>>>>> >>> I hope this helps, >>>>>>>> >>> Kostas >>>>>>>> >>> >>>>>>>> >>> [1] >>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e= >>>>>>>> >>> >>>>>>>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart <[email protected]> >>>>>>>> wrote: >>>>>>>> >>> > >>>>>>>> >>> > Per the documentation: >>>>>>>> >>> > >>>>>>>> >>> > "The meta data file of a Savepoint contains (primarily) >>>>>>>> pointers to all files on stable storage that are part of the >>>>>>>> Savepoint, in >>>>>>>> form of absolute paths." >>>>>>>> >>> > >>>>>>>> >>> > I somehow have a _metadata file that's 1.9GB. Running strings >>>>>>>> on it I find 962 strings, most of which look like HDFS paths, which >>>>>>>> leaves >>>>>>>> a lot of that file-size unexplained. What else is in there, and how >>>>>>>> exactly >>>>>>>> could this be happening? >>>>>>>> >>> > >>>>>>>> >>> > We're running 1.6. >>>>>>>> >>> > >>>>>>>> >>> > Jacob >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > -- >>>>>>>> > Jacob Sevart >>>>>>>> > Software Engineer, Safety >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Jacob Sevart >>>>>>> Software Engineer, Safety >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Jacob Sevart >>>>> Software Engineer, Safety >>>>> >>>> >>>> >>>> -- >>>> Jacob Sevart >>>> Software Engineer, Safety >>>> >>> >> >> -- >> Jacob Sevart >> Software Engineer, Safety >> > -- Jacob Sevart Software Engineer, Safety
