Exception handling

2021-04-27 Thread Jacob Sevart
How do we get uncaught exceptions in operators to skip the problematic
messages, rather than crash the entire job? Is there an easier or less
mistake-prone way to do this than wrapping every operator method in
try/catch?

And what to do about Map? Since it has to return something, we're either
returning null and then catching it with a *.filter(Objects.nonNull)* in
the next operator, or converting it to FlatMap. FlatMap conversion is
annoying, because then we need to mock the Collector for testing.

Obviously it would be best to sanitize inputs so that exceptions don't
occur, but we've recently encountered some setbacks in the game of
whack-a-mole with pathological messages, and are hoping to mitigate the
losses when these do occur.

Jacob


Re: "stepless" sliding windows?

2020-10-22 Thread Jacob Sevart
I think the issue is you have to specify a *time *interval for "step." It
would be nice to consider the preceding N minutes as of every message. You
can somewhat approximate that using a very small step.

On Thu, Oct 22, 2020 at 2:29 AM Danny Chan  wrote:

> The SLIDING window always triggers as of each step, what do you mean by
> "stepless" ?
>
> Alex Cruise  于2020年10月21日周三 上午1:52写道:
>
>> whoops.. as usual, posting led me to find some answers myself. Does this
>> make sense given my requirements?
>>
>> Thanks!
>>
>> private class MyWindowAssigner(val windowSize: Time) : 
>> WindowAssigner() {
>> private val trigger = CountTrigger.of(1) as Trigger> TimeWindow>
>>
>> override fun assignWindows(
>> element: Record,
>> timestamp: Long,
>> context: WindowAssignerContext
>> ): MutableCollection {
>> return mutableListOf(TimeWindow(timestamp - 
>> windowSize.toMilliseconds(), timestamp))
>> }
>>
>> override fun getDefaultTrigger(env: StreamExecutionEnvironment?): 
>> Trigger {
>> return trigger
>> }
>>
>> override fun getWindowSerializer(executionConfig: ExecutionConfig?): 
>> TypeSerializer {
>> return TimeWindow.Serializer()
>> }
>>
>> override fun isEventTime(): Boolean {
>> return true
>> }
>> }
>>
>>
>> On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise  wrote:
>>
>>> Hey folks!
>>>
>>> I have an application that wants to use "stepless" sliding windows, i.e.
>>> we produce aggregates on every event. The windows need to be of a fixed
>>> size, but to have their start and end times update continuously, and I'd
>>> like to trigger on every event. Is this a bad idea? I've googled and read
>>> the docs extensively and haven't been able to identify built-in
>>> functionality or examples that map cleanly to my requirements.
>>>
>>> OK, I just found DeltaTrigger, which looks promising... Does it make
>>> sense to write a WindowAssigner that makes a new Window on every event,
>>> allocation rates aside?
>>>
>>> Thanks!
>>>
>>> -0xe1a
>>>
>>

-- 
Jacob Sevart
Software Engineer, Safety


Re: Checkpoints for kafka source sometimes get 55 GB size (instead of 2 MB) and flink job fails during restoring from such checkpoint

2020-04-17 Thread Jacob Sevart
 for (Map.Entry
> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
> unionOffsetStates.add(
>
> Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),
> kafkaTopicPartitionLongEntry.getValue()));
> sb.append("partition:
> ").append(kafkaTopicPartitionLongEntry.getKey()).append("
> offset:").append(kafkaTopicPartitionLongEntry.getValue()).append('\n');
> }
>
> }
>
> if (offsetCommitMode ==
> OffsetCommitMode.ON_CHECKPOINTS) {
> // truncate the map of pending offsets to
> commit, to prevent infinite growth
> while (pendingOffsetsToCommit.size() >
> MAX_NUM_PENDING_CHECKPOINTS) {
> pendingOffsetsToCommit.remove(0);
> }
> }
> LOG.warn(sb.toString());
> }
> }
>
>
>
> On 4/14/20, 11:44 PM, "Timo Walther"  wrote:
>
> Hi Oleg,
>
> this sounds indeed like abnormal behavior. Are you sure that these
> large
> checkpoints are related to the Kafka consumer only? Are there other
> operators in the pipeline? Because internally the state kept in a
> Kafka
> consumer is pretty minimal and only related to Kafka partition and
> offset management.
>
> If you are sure that the Kafka consumer must produce such a state
> size,
> I would recommend to use a remote debugger and check what is
> checkpointed in the corresponding
> `FlinkKafkaConsumerBase#snapshotState`.
>
> Regards,
> Timo
>
>
> On 15.04.20 03:37, Oleg Vysotsky wrote:
> > Hello,
> >
> > Sometime our flink job starts creating large checkpoints which
> include
> > 55 Gb (instead of 2 MB) related to kafka source. After the flink job
> > creates first “abnormal” checkpoint all next checkpoints are
> “abnormal”
> > as well. Flink job can’t be restored from such checkpoint. Restoring
> > from the checkpoint hangs/fails. Also flnk dashboard hangs and flink
> > cluster crashs during the restoring from such checkpoint.  We
> didn’t
> > catch related error message.  Also we don’t find clear way to
> reproduce
> > this problem (when the flink job creates “abnormal” checkpoints).
> >
> > Configuration:
> >
>     > We are using flink 1.8.1 on emr (emr 5.27)
> >
> > Kafka: confluence kafka 5.4.1
> >
> > Flink kafka connector:
> >   org.apache.flink:flink-connector-kafka_2.11:1.8.1 (it includes
> > org.apache.kafka:kafka-clients:2.0.1 dependencies)
> >
> > Our input kafka topic has 32 partitions and related flink source has
> 32
> > parallelism
> >
> > We use pretty much all default flink kafka concumer setting. We only
> > specified:
> >
> > CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
> >
> > ConsumerConfig.GROUP_ID_CONFIG,
> >
> > CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
> >
> > Thanks a lot  in advance!
> >
> > Oleg
> >
>
>
>

-- 
Jacob Sevart
Software Engineer, Safety


Re: Very large _metadata file

2020-03-21 Thread Jacob Sevart
https://github.com/apache/flink/pull/11475

On Sat, Mar 21, 2020 at 10:37 AM Jacob Sevart  wrote:

> Thanks, will do.
>
> I only want the time stamp to reset when the job comes up with no state.
> Checkpoint recoveries should keep the same value.
>
> Jacob
>
> On Sat, Mar 21, 2020 at 10:16 AM Till Rohrmann 
> wrote:
>
>> Hi Jacob,
>>
>> if you could create patch for updating the union state metadata
>> documentation that would be great. I can help with the review and merging
>> this patch.
>>
>> If the value stays fixed over the lifetime of the job and you know it
>> before starting the job, then you could use the config mechanism. What
>> won't work is if for every restart you would need a different value.
>> Updating the config after a recovery is not possible.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 20, 2020 at 6:29 PM Jacob Sevart  wrote:
>>
>>> Thanks, makes sense.
>>>
>>> What about using the config mechanism? We're collecting and distributing
>>> some environment variables at startup, would it also work to include a
>>> timestamp with that?
>>>
>>> Also, would you be interested in a patch to note the caveat about union
>>> state metadata in the documentation?
>>>
>>> Jacob
>>>
>>> On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Did I understand you correctly that you use the union state to
>>>> synchronize the per partition state across all operators in order to obtain
>>>> a global overview? If this is the case, then this will only work in case of
>>>> a failover. Only then, all operators are being restarted with the union of
>>>> all operators state. If the job would never fail, then there would never be
>>>> an exchange of state.
>>>>
>>>> If you really need a global view over your data, then you need to
>>>> create an operator with a parallelism of 1 which records all the different
>>>> timestamps.
>>>>
>>>> Another idea could be to use the broadcast state pattern [1]. You could
>>>> have an operator which extracts the java.time.Instant and outputs them as a
>>>> side output and simply forwards the records on the main output. Then you
>>>> could use the side output as the broadcast input and the main output as the
>>>> normal input into the broadcast operator. The problem with this approach
>>>> might be that you don't get order guarantees between the side and the main
>>>> output.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwMFaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY=>
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart  wrote:
>>>>
>>>>> Thanks! That would do it. I've disabled the operator for now.
>>>>>
>>>>> The purpose was to know the age of the job's state, so that we could
>>>>> consider its output in terms of how much context it knows. Regular state
>>>>> seemed insufficient because partitions might see their first traffic at
>>>>> different times.
>>>>>
>>>>> How would you go about implementing something like that?
>>>>>
>>>>> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann 
>>>>> wrote:
>>>>>
>>>>>> Hi Jacob,
>>>>>>
>>>>>> I think you are running into some deficiencies of Flink's union state
>>>>>> here. The problem is that for every entry in your list state, Flink 
>>>>>> stores
>>>>>> a separate offset (a long value). The reason for this behaviour is that 
>>>>>> we
>>>>>> use the same state implementation for the union state as well as for the
>>>>>> split state. For the latter, the offset information is required to split
>>>>>> the state in case of changing the parallelism of your job.
>>>>>>
>>>>>> My recommendation would be to try to get rid of union state all
>>>>>> together. The union state has primarily been introduced to checkpoin

Re: Very large _metadata file

2020-03-21 Thread Jacob Sevart
Thanks, will do.

I only want the time stamp to reset when the job comes up with no state.
Checkpoint recoveries should keep the same value.

Jacob

On Sat, Mar 21, 2020 at 10:16 AM Till Rohrmann  wrote:

> Hi Jacob,
>
> if you could create patch for updating the union state metadata
> documentation that would be great. I can help with the review and merging
> this patch.
>
> If the value stays fixed over the lifetime of the job and you know it
> before starting the job, then you could use the config mechanism. What
> won't work is if for every restart you would need a different value.
> Updating the config after a recovery is not possible.
>
> Cheers,
> Till
>
> On Fri, Mar 20, 2020 at 6:29 PM Jacob Sevart  wrote:
>
>> Thanks, makes sense.
>>
>> What about using the config mechanism? We're collecting and distributing
>> some environment variables at startup, would it also work to include a
>> timestamp with that?
>>
>> Also, would you be interested in a patch to note the caveat about union
>> state metadata in the documentation?
>>
>> Jacob
>>
>> On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann 
>> wrote:
>>
>>> Did I understand you correctly that you use the union state to
>>> synchronize the per partition state across all operators in order to obtain
>>> a global overview? If this is the case, then this will only work in case of
>>> a failover. Only then, all operators are being restarted with the union of
>>> all operators state. If the job would never fail, then there would never be
>>> an exchange of state.
>>>
>>> If you really need a global view over your data, then you need to create
>>> an operator with a parallelism of 1 which records all the different
>>> timestamps.
>>>
>>> Another idea could be to use the broadcast state pattern [1]. You could
>>> have an operator which extracts the java.time.Instant and outputs them as a
>>> side output and simply forwards the records on the main output. Then you
>>> could use the side output as the broadcast input and the main output as the
>>> normal input into the broadcast operator. The problem with this approach
>>> might be that you don't get order guarantees between the side and the main
>>> output.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwMFaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY=>
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart  wrote:
>>>
>>>> Thanks! That would do it. I've disabled the operator for now.
>>>>
>>>> The purpose was to know the age of the job's state, so that we could
>>>> consider its output in terms of how much context it knows. Regular state
>>>> seemed insufficient because partitions might see their first traffic at
>>>> different times.
>>>>
>>>> How would you go about implementing something like that?
>>>>
>>>> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Jacob,
>>>>>
>>>>> I think you are running into some deficiencies of Flink's union state
>>>>> here. The problem is that for every entry in your list state, Flink stores
>>>>> a separate offset (a long value). The reason for this behaviour is that we
>>>>> use the same state implementation for the union state as well as for the
>>>>> split state. For the latter, the offset information is required to split
>>>>> the state in case of changing the parallelism of your job.
>>>>>
>>>>> My recommendation would be to try to get rid of union state all
>>>>> together. The union state has primarily been introduced to checkpoint some
>>>>> source implementations and might become deprecated due to performance
>>>>> problems once these sources can be checkpointed differently.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart  wrote:
>>>>>
>>>>>> Oh, I should clarify that's 43MB per partition, so with 48 partitions
>>>>>> it explains my 2GB.

Re: Very large _metadata file

2020-03-20 Thread Jacob Sevart
Thanks, makes sense.

What about using the config mechanism? We're collecting and distributing
some environment variables at startup, would it also work to include a
timestamp with that?

Also, would you be interested in a patch to note the caveat about union
state metadata in the documentation?

Jacob

On Tue, Mar 17, 2020 at 2:51 AM Till Rohrmann  wrote:

> Did I understand you correctly that you use the union state to synchronize
> the per partition state across all operators in order to obtain a global
> overview? If this is the case, then this will only work in case of a
> failover. Only then, all operators are being restarted with the union of
> all operators state. If the job would never fail, then there would never be
> an exchange of state.
>
> If you really need a global view over your data, then you need to create
> an operator with a parallelism of 1 which records all the different
> timestamps.
>
> Another idea could be to use the broadcast state pattern [1]. You could
> have an operator which extracts the java.time.Instant and outputs them as a
> side output and simply forwards the records on the main output. Then you
> could use the side output as the broadcast input and the main output as the
> normal input into the broadcast operator. The problem with this approach
> might be that you don't get order guarantees between the side and the main
> output.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwMFaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=t8gx18WI38mWMMo9o1GAUERpXwVKG5wnYdvT3gBZxo8=v2kbM2mYHBcsKjNzFCaaSbg_3vyfYIhoX8stFXSzRnY=>
>
> Cheers,
> Till
>
> On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart  wrote:
>
>> Thanks! That would do it. I've disabled the operator for now.
>>
>> The purpose was to know the age of the job's state, so that we could
>> consider its output in terms of how much context it knows. Regular state
>> seemed insufficient because partitions might see their first traffic at
>> different times.
>>
>> How would you go about implementing something like that?
>>
>> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Jacob,
>>>
>>> I think you are running into some deficiencies of Flink's union state
>>> here. The problem is that for every entry in your list state, Flink stores
>>> a separate offset (a long value). The reason for this behaviour is that we
>>> use the same state implementation for the union state as well as for the
>>> split state. For the latter, the offset information is required to split
>>> the state in case of changing the parallelism of your job.
>>>
>>> My recommendation would be to try to get rid of union state all
>>> together. The union state has primarily been introduced to checkpoint some
>>> source implementations and might become deprecated due to performance
>>> problems once these sources can be checkpointed differently.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart  wrote:
>>>
>>>> Oh, I should clarify that's 43MB per partition, so with 48 partitions
>>>> it explains my 2GB.
>>>>
>>>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart  wrote:
>>>>
>>>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I
>>>>> found something:
>>>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>>>>>  *weights
>>>>> 43MB (5.3 million longs).
>>>>>
>>>>> "startup-times" is an operator state of mine (union list of
>>>>> java.time.Instant). I see a way to end up fewer items in the list, but I'm
>>>>> not sure how the actual size is related to the number of offsets. Can you
>>>>> elaborate on that?
>>>>>
>>>>> Incidentally, 42.5MB is the number I got out of
>>>>> https://issues.apache.org/jira/browse/FLINK-14618
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14618=DwMFaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=3KZriZyZgBj7mReI9Giq9_Y59NZ6d_4KGE1RkGm2DCI=I6LhM2g2btCo31K3ox7TZhtHQiee95biqJf7Hbj9Dbo=>.
>>>>> So I think my two problems are closely related.
>>>>>
>>>>> Jacob
>>>>>
>>>>> On Mon, Mar 9, 2020 

Re: Very large _metadata file

2020-03-16 Thread Jacob Sevart
Thanks! That would do it. I've disabled the operator for now.

The purpose was to know the age of the job's state, so that we could
consider its output in terms of how much context it knows. Regular state
seemed insufficient because partitions might see their first traffic at
different times.

How would you go about implementing something like that?

On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann  wrote:

> Hi Jacob,
>
> I think you are running into some deficiencies of Flink's union state
> here. The problem is that for every entry in your list state, Flink stores
> a separate offset (a long value). The reason for this behaviour is that we
> use the same state implementation for the union state as well as for the
> split state. For the latter, the offset information is required to split
> the state in case of changing the parallelism of your job.
>
> My recommendation would be to try to get rid of union state all together.
> The union state has primarily been introduced to checkpoint some source
> implementations and might become deprecated due to performance problems
> once these sources can be checkpointed differently.
>
> Cheers,
> Till
>
> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart  wrote:
>
>> Oh, I should clarify that's 43MB per partition, so with 48 partitions it
>> explains my 2GB.
>>
>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart  wrote:
>>
>>> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found
>>> something:
>>> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>>>  *weights
>>> 43MB (5.3 million longs).
>>>
>>> "startup-times" is an operator state of mine (union list of
>>> java.time.Instant). I see a way to end up fewer items in the list, but I'm
>>> not sure how the actual size is related to the number of offsets. Can you
>>> elaborate on that?
>>>
>>> Incidentally, 42.5MB is the number I got out of
>>> https://issues.apache.org/jira/browse/FLINK-14618
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14618=DwMFaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=3KZriZyZgBj7mReI9Giq9_Y59NZ6d_4KGE1RkGm2DCI=I6LhM2g2btCo31K3ox7TZhtHQiee95biqJf7Hbj9Dbo=>.
>>> So I think my two problems are closely related.
>>>
>>> Jacob
>>>
>>> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu 
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> As Gordon said, the metadata will contain the ByteStreamStateHandle,
>>>> when writing out the ByteStreamStateHandle, will write out the handle name
>>>> -- which is a path(as you saw). The ByteStreamStateHandle will be created
>>>> when state size is small than `state.backend.fs.memory-threshold`(default
>>>> is 1024).
>>>>
>>>> If you want to verify this, you can ref the unit test
>>>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
>>>> metadata, you can find out that there are many `ByteStreamStateHandle`, and
>>>> their names are the strings you saw in the metadata.
>>>>
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Jacob Sevart  于2020年3月6日周五 上午3:57写道:
>>>>
>>>>> Thanks, I will monitor that thread.
>>>>>
>>>>> I'm having a hard time following the serialization code, but if you
>>>>> know anything about the layout, tell me if this makes sense. What I see in
>>>>> the hex editor is, first, many HDFS paths. Then gigabytes of unreadable
>>>>> data. Then finally another HDFS path at the end.
>>>>>
>>>>> If it is putting state in there, under normal circumstances, does it
>>>>> make sense that it would be interleaved with metadata? I would expect all
>>>>> the metadata to come first, and then state.
>>>>>
>>>>> Jacob
>>>>>
>>>>>
>>>>>
>>>>> Jacob
>>>>>
>>>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas 
>>>>> wrote:
>>>>>
>>>>>> Hi Jacob,
>>>>>>
>>>>>> As I said previously I am not 100% sure what can be causing this
>>>>>> behavior, but this is a related thread here:
>>>>>>
>>>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.a

Re: Very large _metadata file

2020-03-13 Thread Jacob Sevart
Oh, I should clarify that's 43MB per partition, so with 48 partitions it
explains my 2GB.

On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart  wrote:

> Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found
> something:
> *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
>  *weights
> 43MB (5.3 million longs).
>
> "startup-times" is an operator state of mine (union list of
> java.time.Instant). I see a way to end up fewer items in the list, but I'm
> not sure how the actual size is related to the number of offsets. Can you
> elaborate on that?
>
> Incidentally, 42.5MB is the number I got out of
> https://issues.apache.org/jira/browse/FLINK-14618. So I think my two
> problems are closely related.
>
> Jacob
>
> On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> As Gordon said, the metadata will contain the ByteStreamStateHandle, when
>> writing out the ByteStreamStateHandle, will write out the handle name --
>> which is a path(as you saw). The ByteStreamStateHandle will be created when
>> state size is small than `state.backend.fs.memory-threshold`(default is
>> 1024).
>>
>> If you want to verify this, you can ref the unit test
>> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
>> metadata, you can find out that there are many `ByteStreamStateHandle`, and
>> their names are the strings you saw in the metadata.
>>
>> Best,
>> Congxian
>>
>>
>> Jacob Sevart  于2020年3月6日周五 上午3:57写道:
>>
>>> Thanks, I will monitor that thread.
>>>
>>> I'm having a hard time following the serialization code, but if you know
>>> anything about the layout, tell me if this makes sense. What I see in the
>>> hex editor is, first, many HDFS paths. Then gigabytes of unreadable data.
>>> Then finally another HDFS path at the end.
>>>
>>> If it is putting state in there, under normal circumstances, does it
>>> make sense that it would be interleaved with metadata? I would expect all
>>> the metadata to come first, and then state.
>>>
>>> Jacob
>>>
>>>
>>>
>>> Jacob
>>>
>>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas 
>>> wrote:
>>>
>>>> Hi Jacob,
>>>>
>>>> As I said previously I am not 100% sure what can be causing this
>>>> behavior, but this is a related thread here:
>>>>
>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E=DwIBaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI=
>>>>
>>>> Which you can re-post your problem and monitor for answers.
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
>>>> >
>>>> > Kostas and Gordon,
>>>> >
>>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that
>>>> setting configured so it should be at the default 1024b. This is the full
>>>> "state.*" section showing in the JobManager UI.
>>>> >
>>>> >
>>>> >
>>>> > Jacob
>>>> >
>>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
>>>> tzuli...@apache.org> wrote:
>>>> >>
>>>> >> Hi Jacob,
>>>> >>
>>>> >> Apart from what Klou already mentioned, one slightly possible reason:
>>>> >>
>>>> >> If you are using the FsStateBackend, it is also possible that your
>>>> state is small enough to be considered to be stored inline within the
>>>> metadata file.
>>>> >> That is governed by the "state.backend.fs.memory-threshold"
>>>> configuration, with a default value of 1024 bytes, or can also be
>>>> configured with the `fileStateSizeThreshold` argument when constructing the
>>>> `FsStateBackend`.
>>>> >> The purpose of that threshold is to ensure that the backend does not
>>>> create a large amount of very small files, where potentially the file
>>>> pointers are actually larger than the state itself.
>>>> >>
>>>> >> Cheers,
>>>> >> Gordon
>>>> >>
>>>> >

Re: Very large _metadata file

2020-03-13 Thread Jacob Sevart
Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found
something:
*subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
*weights
43MB (5.3 million longs).

"startup-times" is an operator state of mine (union list of
java.time.Instant). I see a way to end up fewer items in the list, but I'm
not sure how the actual size is related to the number of offsets. Can you
elaborate on that?

Incidentally, 42.5MB is the number I got out of
https://issues.apache.org/jira/browse/FLINK-14618. So I think my two
problems are closely related.

Jacob

On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu  wrote:

> Hi
>
> As Gordon said, the metadata will contain the ByteStreamStateHandle, when
> writing out the ByteStreamStateHandle, will write out the handle name --
> which is a path(as you saw). The ByteStreamStateHandle will be created when
> state size is small than `state.backend.fs.memory-threshold`(default is
> 1024).
>
> If you want to verify this, you can ref the unit test
> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
> metadata, you can find out that there are many `ByteStreamStateHandle`, and
> their names are the strings you saw in the metadata.
>
> Best,
> Congxian
>
>
> Jacob Sevart  于2020年3月6日周五 上午3:57写道:
>
>> Thanks, I will monitor that thread.
>>
>> I'm having a hard time following the serialization code, but if you know
>> anything about the layout, tell me if this makes sense. What I see in the
>> hex editor is, first, many HDFS paths. Then gigabytes of unreadable data.
>> Then finally another HDFS path at the end.
>>
>> If it is putting state in there, under normal circumstances, does it make
>> sense that it would be interleaved with metadata? I would expect all the
>> metadata to come first, and then state.
>>
>> Jacob
>>
>>
>>
>> Jacob
>>
>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas 
>> wrote:
>>
>>> Hi Jacob,
>>>
>>> As I said previously I am not 100% sure what can be causing this
>>> behavior, but this is a related thread here:
>>>
>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E=DwIBaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI=
>>>
>>> Which you can re-post your problem and monitor for answers.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
>>> >
>>> > Kostas and Gordon,
>>> >
>>> > Thanks for the suggestions! I'm on RocksDB. We don't have that setting
>>> configured so it should be at the default 1024b. This is the full "state.*"
>>> section showing in the JobManager UI.
>>> >
>>> >
>>> >
>>> > Jacob
>>> >
>>> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>> >>
>>> >> Hi Jacob,
>>> >>
>>> >> Apart from what Klou already mentioned, one slightly possible reason:
>>> >>
>>> >> If you are using the FsStateBackend, it is also possible that your
>>> state is small enough to be considered to be stored inline within the
>>> metadata file.
>>> >> That is governed by the "state.backend.fs.memory-threshold"
>>> configuration, with a default value of 1024 bytes, or can also be
>>> configured with the `fileStateSizeThreshold` argument when constructing the
>>> `FsStateBackend`.
>>> >> The purpose of that threshold is to ensure that the backend does not
>>> create a large amount of very small files, where potentially the file
>>> pointers are actually larger than the state itself.
>>> >>
>>> >> Cheers,
>>> >> Gordon
>>> >>
>>> >>
>>> >>
>>> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas 
>>> wrote:
>>> >>>
>>> >>> Hi Jacob,
>>> >>>
>>> >>> Could you specify which StateBackend you are using?
>>> >>>
>>> >>> The reason I am asking is that, from the documentation in [1]:
>>> >>>
>>> >>> "Note that if you use the MemoryStateBackend, metadata and savepoint
>>> >>> state will be stored in the _m

Re: Very large _metadata file

2020-03-05 Thread Jacob Sevart
Thanks, I will monitor that thread.

I'm having a hard time following the serialization code, but if you know
anything about the layout, tell me if this makes sense. What I see in the
hex editor is, first, many HDFS paths. Then gigabytes of unreadable data.
Then finally another HDFS path at the end.

If it is putting state in there, under normal circumstances, does it make
sense that it would be interleaved with metadata? I would expect all the
metadata to come first, and then state.

Jacob



Jacob

On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas  wrote:

> Hi Jacob,
>
> As I said previously I am not 100% sure what can be causing this
> behavior, but this is a related thread here:
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E=DwIBaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI=
>
> Which you can re-post your problem and monitor for answers.
>
> Cheers,
> Kostas
>
> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
> >
> > Kostas and Gordon,
> >
> > Thanks for the suggestions! I'm on RocksDB. We don't have that setting
> configured so it should be at the default 1024b. This is the full "state.*"
> section showing in the JobManager UI.
> >
> >
> >
> > Jacob
> >
> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai 
> wrote:
> >>
> >> Hi Jacob,
> >>
> >> Apart from what Klou already mentioned, one slightly possible reason:
> >>
> >> If you are using the FsStateBackend, it is also possible that your
> state is small enough to be considered to be stored inline within the
> metadata file.
> >> That is governed by the "state.backend.fs.memory-threshold"
> configuration, with a default value of 1024 bytes, or can also be
> configured with the `fileStateSizeThreshold` argument when constructing the
> `FsStateBackend`.
> >> The purpose of that threshold is to ensure that the backend does not
> create a large amount of very small files, where potentially the file
> pointers are actually larger than the state itself.
> >>
> >> Cheers,
> >> Gordon
> >>
> >>
> >>
> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas 
> wrote:
> >>>
> >>> Hi Jacob,
> >>>
> >>> Could you specify which StateBackend you are using?
> >>>
> >>> The reason I am asking is that, from the documentation in [1]:
> >>>
> >>> "Note that if you use the MemoryStateBackend, metadata and savepoint
> >>> state will be stored in the _metadata file. Since it is
> >>> self-contained, you may move the file and restore from any location."
> >>>
> >>> I am also cc'ing Gordon who may know a bit more about state formats.
> >>>
> >>> I hope this helps,
> >>> Kostas
> >>>
> >>> [1]
> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html=DwIBaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA=
> >>>
> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
> >>> >
> >>> > Per the documentation:
> >>> >
> >>> > "The meta data file of a Savepoint contains (primarily) pointers to
> all files on stable storage that are part of the Savepoint, in form of
> absolute paths."
> >>> >
> >>> > I somehow have a _metadata file that's 1.9GB. Running strings on it
> I find 962 strings, most of which look like HDFS paths, which leaves a lot
> of that file-size unexplained. What else is in there, and how exactly could
> this be happening?
> >>> >
> >>> > We're running 1.6.
> >>> >
> >>> > Jacob
> >
> >
> >
> > --
> > Jacob Sevart
> > Software Engineer, Safety
>


-- 
Jacob Sevart
Software Engineer, Safety


Re: Very large _metadata file

2020-03-04 Thread Jacob Sevart
Kostas and Gordon,

Thanks for the suggestions! I'm on RocksDB. We don't have that setting
configured so it should be at the default 1024b. This is the full "state.*"
section showing in the JobManager UI.

[image: Screen Shot 2020-03-04 at 9.56.20 AM.png]

Jacob

On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Jacob,
>
> Apart from what Klou already mentioned, one slightly possible reason:
>
> If you are using the FsStateBackend, it is also possible that your state
> is small enough to be considered to be stored inline within the metadata
> file.
> That is governed by the "state.backend.fs.memory-threshold" configuration,
> with a default value of 1024 bytes, or can also be configured with the
> `fileStateSizeThreshold` argument when constructing the `FsStateBackend`.
> The purpose of that threshold is to ensure that the backend does not
> create a large amount of very small files, where potentially the file
> pointers are actually larger than the state itself.
>
> Cheers,
> Gordon
>
>
>
> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas  wrote:
>
>> Hi Jacob,
>>
>> Could you specify which StateBackend you are using?
>>
>> The reason I am asking is that, from the documentation in [1]:
>>
>> "Note that if you use the MemoryStateBackend, metadata and savepoint
>> state will be stored in the _metadata file. Since it is
>> self-contained, you may move the file and restore from any location."
>>
>> I am also cc'ing Gordon who may know a bit more about state formats.
>>
>> I hope this helps,
>> Kostas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html=DwMFaQ=r2dcLCtU9q6n0vrtnDw9vg=lTq5mEceM-U-tVfWzKBngg=Gj8rciOHU7hUM_QxeMOSC8QqWhJcx_q9M8mrdNqdcm8=viMyoVEHWkMIil_1RSpjvlbQx9AFO6C-Sk6oe0U_x40=>
>>
>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
>> >
>> > Per the documentation:
>> >
>> > "The meta data file of a Savepoint contains (primarily) pointers to all
>> files on stable storage that are part of the Savepoint, in form of absolute
>> paths."
>> >
>> > I somehow have a _metadata file that's 1.9GB. Running strings on it I
>> find 962 strings, most of which look like HDFS paths, which leaves a lot of
>> that file-size unexplained. What else is in there, and how exactly could
>> this be happening?
>> >
>> > We're running 1.6.
>> >
>> > Jacob
>>
>

-- 
Jacob Sevart
Software Engineer, Safety


Very large _metadata file

2020-03-03 Thread Jacob Sevart
Per the documentation:

"The meta data file of a Savepoint contains (primarily) pointers to all
files on stable storage that are part of the Savepoint, in form of absolute
paths."

I somehow have a _metadata file that's 1.9GB. Running *strings *on it I
find 962 strings, most of which look like HDFS paths, which leaves a lot of
that file-size unexplained. What else is in there, and how exactly could
this be happening?

We're running 1.6.

Jacob