Thanks, makes sense.

What about using the config mechanism? We're collecting and distributing
some environment variables at startup, would it also work to include a
timestamp with that?

Also, would you be interested in a patch to note the caveat about union
state metadata in the documentation?

Jacob

On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Did I understand you correctly that you use the union state to synchronize
> the per partition state across all operators in order to obtain a global
> overview? If this is the case, then this will only work in case of a
> failover. Only then, all operators are being restarted with the union of
> all operators state. If the job would never fail, then there would never be
> an exchange of state.
>
> If you really need a global view over your data, then you need to create
> an operator with a parallelism of 1 which records all the different
> timestamps.
>
> Another idea could be to use the broadcast state pattern [1]. You could
> have an operator which extracts the java.time.Instant and outputs them as a
> side output and simply forwards the records on the main output. Then you
> could use the side output as the broadcast input and the main output as the
> normal input into the broadcast operator. The problem with this approach
> might be that you don't get order guarantees between the side and the main
> output.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8&s=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY&e=>
>
> Cheers,
> Till
>
> On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart <jsev...@uber.com> wrote:
>
>> Thanks! That would do it. I've disabled the operator for now.
>>
>> The purpose was to know the age of the job's state, so that we could
>> consider its output in terms of how much context it knows. Regular state
>> seemed insufficient because partitions might see their first traffic at
>> different times.
>>
>> How would you go about implementing something like that?
>>
>> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Jacob,
>>>
>>> I think you are running into some deficiencies of Flink's union state
>>> here. The problem is that for every entry in your list state, Flink stores
>>> a separate offset (a long value). The reason for this behaviour is that we
>>> use the same state implementation for the union state as well as for the
>>> split state. For the latter, the offset information is required to split
>>> the state in case of changing the parallelism of your job.
>>>
>>> My recommendation would be to try to get rid of union state all
>>> together. The union state has primarily been introduced to checkpoint some
>>> source implementations and might become deprecated due to performance
>>> problems once these sources can be checkpointed differently.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart <jsev...@uber.com> wrote:
>>>
>>>> Oh, I should clarify that's 43MB per partition, so with 48 partitions
>>>> it explains my 2GB.
>>>>
>>>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart <jsev...@uber.com> wrote:
>>>>
>>>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I
>>>>> found something:
>>>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>>>>>  *weights
>>>>> 43MB (5.3 million longs).
>>>>>
>>>>> "startup-times" is an operator state of mine (union list of
>>>>> java.time.Instant). I see a way to end up fewer items in the list, but I'm
>>>>> not sure how the actual size is related to the number of offsets. Can you
>>>>> elaborate on that?
>>>>>
>>>>> Incidentally, 42.5MB is the number I got out of
>>>>> https://issues.apache.org/jira/browse/FLINK-14618
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14618&d=DwMFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=3KZriZyZgBj7mReI9Giq9_Y59NZ6d_4KGE1RkGm2DCI&s=I6LhM2g2btCo31K3ox7TZhtHQiee95biqJf7Hbj9Dbo&e=>.
>>>>> So I think my two problems are closely related.
>>>>>
>>>>> Jacob
>>>>>
>>>>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu <qcx978132...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> As Gordon said, the metadata will contain the ByteStreamStateHandle,
>>>>>> when writing out the ByteStreamStateHandle, will write out the handle 
>>>>>> name
>>>>>> -- which is a path(as you saw). The ByteStreamStateHandle will be created
>>>>>> when state size is small than `state.backend.fs.memory-threshold`(default
>>>>>> is 1024).
>>>>>>
>>>>>> If you want to verify this, you can ref the unit test
>>>>>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
>>>>>> metadata, you can find out that there are many `ByteStreamStateHandle`, 
>>>>>> and
>>>>>> their names are the strings you saw in the metadata.
>>>>>>
>>>>>> Best,
>>>>>> Congxian
>>>>>>
>>>>>>
>>>>>> Jacob Sevart <jsev...@uber.com> 于2020年3月6日周五 上午3:57写道:
>>>>>>
>>>>>>> Thanks, I will monitor that thread.
>>>>>>>
>>>>>>> I'm having a hard time following the serialization code, but if you
>>>>>>> know anything about the layout, tell me if this makes sense. What I see 
>>>>>>> in
>>>>>>> the hex editor is, first, many HDFS paths. Then gigabytes of unreadable
>>>>>>> data. Then finally another HDFS path at the end.
>>>>>>>
>>>>>>> If it is putting state in there, under normal circumstances, does it
>>>>>>> make sense that it would be interleaved with metadata? I would expect 
>>>>>>> all
>>>>>>> the metadata to come first, and then state.
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas <kklou...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jacob,
>>>>>>>>
>>>>>>>> As I said previously I am not 100% sure what can be causing this
>>>>>>>> behavior, but this is a related thread here:
>>>>>>>>
>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e=
>>>>>>>>
>>>>>>>> Which you can re-post your problem and monitor for answers.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart <jsev...@uber.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Kostas and Gordon,
>>>>>>>> >
>>>>>>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that
>>>>>>>> setting configured so it should be at the default 1024b. This is the 
>>>>>>>> full
>>>>>>>> "state.*" section showing in the JobManager UI.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Jacob
>>>>>>>> >
>>>>>>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
>>>>>>>> tzuli...@apache.org> wrote:
>>>>>>>> >>
>>>>>>>> >> Hi Jacob,
>>>>>>>> >>
>>>>>>>> >> Apart from what Klou already mentioned, one slightly possible
>>>>>>>> reason:
>>>>>>>> >>
>>>>>>>> >> If you are using the FsStateBackend, it is also possible that
>>>>>>>> your state is small enough to be considered to be stored inline within 
>>>>>>>> the
>>>>>>>> metadata file.
>>>>>>>> >> That is governed by the "state.backend.fs.memory-threshold"
>>>>>>>> configuration, with a default value of 1024 bytes, or can also be
>>>>>>>> configured with the `fileStateSizeThreshold` argument when 
>>>>>>>> constructing the
>>>>>>>> `FsStateBackend`.
>>>>>>>> >> The purpose of that threshold is to ensure that the backend does
>>>>>>>> not create a large amount of very small files, where potentially the 
>>>>>>>> file
>>>>>>>> pointers are actually larger than the state itself.
>>>>>>>> >>
>>>>>>>> >> Cheers,
>>>>>>>> >> Gordon
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas <
>>>>>>>> kklou...@gmail.com> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Hi Jacob,
>>>>>>>> >>>
>>>>>>>> >>> Could you specify which StateBackend you are using?
>>>>>>>> >>>
>>>>>>>> >>> The reason I am asking is that, from the documentation in [1]:
>>>>>>>> >>>
>>>>>>>> >>> "Note that if you use the MemoryStateBackend, metadata and
>>>>>>>> savepoint
>>>>>>>> >>> state will be stored in the _metadata file. Since it is
>>>>>>>> >>> self-contained, you may move the file and restore from any
>>>>>>>> location."
>>>>>>>> >>>
>>>>>>>> >>> I am also cc'ing Gordon who may know a bit more about state
>>>>>>>> formats.
>>>>>>>> >>>
>>>>>>>> >>> I hope this helps,
>>>>>>>> >>> Kostas
>>>>>>>> >>>
>>>>>>>> >>> [1]
>>>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e=
>>>>>>>> >>>
>>>>>>>> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart <jsev...@uber.com>
>>>>>>>> wrote:
>>>>>>>> >>> >
>>>>>>>> >>> > Per the documentation:
>>>>>>>> >>> >
>>>>>>>> >>> > "The meta data file of a Savepoint contains (primarily)
>>>>>>>> pointers to all files on stable storage that are part of the 
>>>>>>>> Savepoint, in
>>>>>>>> form of absolute paths."
>>>>>>>> >>> >
>>>>>>>> >>> > I somehow have a _metadata file that's 1.9GB. Running strings
>>>>>>>> on it I find 962 strings, most of which look like HDFS paths, which 
>>>>>>>> leaves
>>>>>>>> a lot of that file-size unexplained. What else is in there, and how 
>>>>>>>> exactly
>>>>>>>> could this be happening?
>>>>>>>> >>> >
>>>>>>>> >>> > We're running 1.6.
>>>>>>>> >>> >
>>>>>>>> >>> > Jacob
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > --
>>>>>>>> > Jacob Sevart
>>>>>>>> > Software Engineer, Safety
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jacob Sevart
>>>>>>> Software Engineer, Safety
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Jacob Sevart
>>>>> Software Engineer, Safety
>>>>>
>>>>
>>>>
>>>> --
>>>> Jacob Sevart
>>>> Software Engineer, Safety
>>>>
>>>
>>
>> --
>> Jacob Sevart
>> Software Engineer, Safety
>>
>

-- 
Jacob Sevart
Software Engineer, Safety

Reply via email to