Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-13 Thread Maciej Obuchowski
Hello,

I'm an OpenLineage committer - and previously, a minor Flink contributor.
OpenLineage community is very interested in conversation about Flink
metadata, and we'll be happy to cooperate with the Flink community.

Best,
Maciej Obuchowski



czw., 13 sty 2022 o 18:12 Martijn Visser  napisał(a):
>
> Hi all,
>
> @Andrew thanks for sharing that!
>
> @Tero good point, I should have clarified the purpose. I want to understand
> what "metadata platforms" tools are used or evaluated by the Flink
> community, what's their purpose for using such a tool (is it as a generic
> catalogue, as a data discovery tool, is lineage the important part etc) and
> what problems are people trying to solve with them. This space is
> developing rapidly and there are many open source and commercial tools
> popping up/growing, which is also why I'm trying to keep an open vision on
> how this space is evolving.
>
> If the Flink community wants to integrate with metadata tools, I fully
> agree that ideally we do that via standards. My perception is at this
> moment that no clear standard has yet been established. You mentioned
> open-metadata.org, but I believe https://openlineage.io/ is also an
> alternative standard.
>
> Best regards,
>
> Martijn
>
> On Thu, 13 Jan 2022 at 17:00, Tero Paananen  wrote:
>
> > > I'm currently checking out different metadata platforms, such as
> > Amundsen [1] and Datahub [2]. In short, these types of tools try to address
> > problems related to topics such as data discovery, data lineage and an
> > overall data catalogue.
> > >
> > > I'm reaching out to the Dev and User mailing lists to get some feedback.
> > It would really help if you could spend a couple of minutes to let me know
> > if you already use either one of the two mentioned metadata platforms or
> > another one, or are you evaluating such tools? If so, is that for the
> > purpose as a catalogue, for lineage or anything else? Any type of feedback
> > on these types of tools is appreciated.
> >
> > I hope you don't mind answers off-list.
> >
> > You didn't say what purpose you're evaluating these tools for, but if
> > you're evaluating platforms for integration with Flink, I wouldn't
> > approach it with a particular product in mind. Rather I'd create some
> > sort of facility to propagate metadata and/or lineage information in a
> > generic way and allow Flink users to plug in their favorite metadata
> > tool. Using standards like OpenLineage, for example. I believe Egeria
> > is also trying to create an open standard for metadata.;
> >
> > If you're evaluating data catalogs for personal use or use in a
> > particular project, Andrew's answer about the Wikimedia evaluation is
> > a good start. It's missing OpenMetadata (https://open-metadata.org/).
> > That one is showing a LOT of promise. Wikimedia's evaluation is also
> > missing industry leading commercial products (understandably, given
> > their mission). Collibra and Alation probably the ones that pop up
> > most often.
> >
> > I have personally looked into both DataHub and Amundsen. My high level
> > feedback is that DataHub is overengineered, and using proprietary
> > LinkedIn technology platform(s), which aren't widely used anywhere.
> > Amundsen is much less flexible than DataHub and quite basic in its
> > functionality. If you need anything beyond what it already offers,
> > good luck.
> >
> > We dumped Amundsen in favor of OpenMetadata a few months back. We
> > don't have enough data points to fully evaluate OpenMetadata yet.
> >
> > -TPP
> >


Re: Dead Letter Queue for JdbcSink

2021-08-03 Thread Maciej Obuchowski
Hey.

As far as I see, you're not overriding functions like open,
setRuntimeContext, snapshotState, initializeState - the calls needs to
be passed to the inner sink function.

pon., 2 sie 2021 o 19:31 Rion Williams  napisał(a):
>
> Hi again Maciek (and all),
>
> I just recently returned to start investigating this approach, however I 
> can't seem to get the underlying invocation to work as I would normally 
> expect. I'll try to share a bit more as what I currently have and perhaps I'm 
> just missing something minor that someone may be able to spot.
>
> To reiterate - what I'm attempting to do is take a stream of events flowing 
> through, specific types of entities are extracted from these events into 
> multiple side-outputs, and these side-outputs are passed to a sync that will 
> write them via JDBC using logic specific to that entity. What I am aiming to 
> achieve is being able to capture a single record that may be problematic and 
> avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand 
> this would mean limiting batching sizes to a single record, however I'm 
> assuming that the connections themselves could be pooled possibly to avoid 
> opening up a new connection per call. If this isn't the case, is there a way 
> to handle that (or would I need to implement my own sync).
>
> ```
> val users = Tags.users
> parsedChangelogs
> .getSideOutput(users)
> .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
> .uid("sync-${users.id}-to-postgres")
> .name("sync-${users.id}-to-postgres")
>
> val addresses = Tags.addresses
> parsedChangelogs
> .getSideOutput(addresses)
> .addSink(PostgresSink.fromEntityType(addresses.typeInfo, 
> parameters))
> .uid("sync-${addresses.id}-to-postgres")
> .name("sync-${addresses.id}-to-postgres")
> ```
>
> And the dynamic sink (that would associate a given entity to the necessary 
> calls made to the database) looks a bit like this:
>
> ```
> fun  fromEntityType(typeInfo: TypeInformation, parameters: 
> ParameterTool): SinkFunction {
> val metadata = getQueryMetadataFromType(typeInfo)
>
> return JdbcSink
> .sink(
> metadata.query,
> metadata.statement,
> getJdbcExecutionOptions(parameters),
> JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
> .withDriverName("org.postgresql.Driver")
> .withUrl(buildConnectionString(parameters))
> .build(),
> )
> }
> ```
>
> I've tried several, a naive wrapper approach that I attempted looked 
> something like this:
>
> ```
> class DlqWrapper(private val sink: SinkFunction, val parameters: 
> ParameterTool): SinkFunction {
> private val logger = LoggerFactory.getLogger(DlqSink::class.java)
> private val dlqSink: SinkFunction = ...
>
> override fun invoke(value: T, context: SinkFunction.Context) {
> try {
> sink.invoke(value, context)
> }
> catch (ex: Exception) {
> logger.error("Encountered sink exception. Sending message to dead 
> letter queue. Value: $value. Exception: ${ex.message}")
> val payload = Gson().toJsonTree(value).asJsonObject
> payload.addProperty("exception", ex.message)
>
> dlqSink.invoke("$payload", context)
> }
> }
> }
> ```
>
> After doing this, it doesn't look like when the invoke calls are made that 
> it's actually attempting to perform the JDBC calls to insert the records into 
> those sources. I'm not entirely sure if this is related specifically for how 
> the JdbcSink is wrapped (via the GenericJdbcSink, etc.).
>
> I had seen several posts around involving the use of an 
> InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary 
> for handling this type of functionality. Any ideas/thoughts/examples would be 
> greatly appreciated.
>
> Thanks,
>
> Rion
>
> On 2021/07/14 15:47:18, Maciej Bryński  wrote:
> > This is the idea.
> > Of course you need to wrap more functions like: open, close,
> > notifyCheckpointComplete, snapshotState, initializeState and
> > setRuntimeContext.
> >
> > The problem is that if you want to catch problematic record you need
> > to set batch size to 1, which gives very bad performance.
> >
> > Regards,
> > Maciek
> >
> > śr., 14 lip 2021 o 17:31 Rion Williams  napisał(a):
> > >
> > > Hi Maciej,
> > >
> > > Thanks for the quick response. I wasn't aware of the idea of using a 
> > > SinkWrapper, but I'm not quite certain that it would suit this specific 
> > > use case (as a SinkFunction / RichSinkFunction doesn't appear to support 
> > > side-outputs). Essentially, what I'd hope to accomplish would be to pick 
> > > up when a bad record could not be written to the sink and then offload 
> > > that via a side-output somewhere else.
> > >
> > > Something l

Re: Stream processing into single sink to multiple DB Schemas

2021-06-07 Thread Maciej Obuchowski
Hey,
We had similar problem, but with 1000s of tables. I've created issue [1]
and PR with internally used solution [2], but unfortunately, there seems to
be no interest in upstreaming this feature.

Thanks,
Maciej

[1] https://issues.apache.org/jira/browse/FLINK-21643
[2] https://github.com/apache/flink/pull/15102

pon., 7 cze 2021 o 17:15 Nicolaus Weidner <
nicolaus.weid...@data-artisans.com> napisał(a):

> Hi Tamir,
>
> I assume you want to use the Jdbc connector?
> You can use three filters on your input stream to separate it into three
> separate streams, then add a sink to each of those (see e.g. [1]). Then you
> can have a different SQL statement for each of the three sinks. If you
> specify the driver name in JdbcConnectionOptions, that driver will be used
> to obtain a DB connection (see [2]). So if you use a pooling driver (e.g.
> [3]), connections should automatically be taken from a shared pool.
>
> Does that help?
>
> Best wishes,
> Nico
>
> [1]
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/#jdbc-connection-parameters
> [3] https://commons.apache.org/proper/commons-dbcp/
>
> On Mon, Jun 7, 2021 at 8:23 AM Tamir Sagi 
> wrote:
>
>> Hey Community
>>
>> Assuming there are 3 groups,
>> A, B, C
>>
>> Each group represents a set of data about employees and salaries.
>> Group A ( 0-20K $)
>> Group B (20K$ - 50K$)
>> Group C ( > 50K$)
>>
>> Is it possible to process stream data from single source containing
>> information about employees and salaries and split the data into different
>> DB schemas on the same DB? (Single Sink - *Single Connection*)
>>
>> I Encountered Side output and dynamic tables
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/side_output/
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/
>>
>> I'm not sure it's the right way.
>>
>> If there is a better way , enlighten me
>>
>> Thank you,
>>
>> Tamir.
>>
>>
>>
>>
>> Confidentiality: This communication and any attachments are intended for
>> the above-named persons only and may be confidential and/or legally
>> privileged. Any opinions expressed in this communication are not
>> necessarily those of NICE Actimize. If this communication has come to you
>> in error you must take no action based on it, nor must you copy or show it
>> to anyone; please delete/destroy and inform the sender by e-mail
>> immediately.
>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>> Viruses: Although we have taken steps toward ensuring that this e-mail
>> and attachments are free from any virus, we advise that in keeping with
>> good computing practice the recipient should ensure they are actually virus
>> free.
>>
>


Re: Handling Bounded Sources with KafkaSource

2021-03-14 Thread Maciej Obuchowski
Hey Rion,

We solved this issue by using usual, unbounded streams, and using
awaitility library to express conditions that would end the test - for
example, having particular data in a table.

IMO this type of testing has the advantage that you won't have divergent
behavior from production as you have experienced.

Regards,
Maciej



On Sun, Mar 14, 2021, 05:41 Rion Williams  wrote:

> Following up on this issue, I realized my initial problem was that my test
> case only contained a single message to send through the pipeline. This
> resulted in the earliest offset also being the latest and things didn’t
> exactly work as expected. Once I added several other messages and sent them
> through, the pipeline appeared to run as expected.
>
> However, the use of “bounded” seems to be fickle in terms of test cases.
> Since an experience is thrown once the bound is reached, I can typically
> just wrap my test execution within a try/catch and simply apply my
> assertion afterwards.
>
> This occasionally results in passing tests, but in others, it seems that
> the bound is reached prior to processing the messages it had seen thus far,
> and as a result yields a failing test. I don’t know if this is a bug, or
> intentional, but I’m not aware of a workaround that could “force” the
> pipeline to finish processing all of the messages from the topic once the
> bound is reached. I’ve tried sending through “flush records” to the topic,
> however since there are multiple partitions, it’s not guaranteed that the
> pipeline will read those last.
>
> This is purely a testing problem, as a production job would be streaming
> and unbounded, however I’d love to have a reliable integration test or a
> pattern that I could use to guarantee the processing of a finite set of
> data via a KafkaSource (I.e. send finite records to Kafka, read from topic,
> process all records, apply assertion after processing).
>
> Any ideas/recommendations/workarounds would be greatly welcome and I’d be
> happy to share my specific code / use-cases if needed.
>
> Thanks much,
>
> Rion
>
> On Mar 12, 2021, at 10:19 AM, Rion Williams  wrote:
>
> 
> Hi all,
>
> I've been using the KafkaSource API as opposed to the classic consumer and
> things have been going well. I configured my source such that it could be
> used in either a streaming or bounded mode, with the bounded approach
> specifically aimed at improving testing (unit/integration).
>
> I've noticed that when I attempt to run through a test - it seems that the
> pipeline never acknowledges the "end" of the stream in a bounded context
> and just runs forever and never makes it to my assert.
>
> Does anything look glaringly wrong with how the source is being defined?
>
> object KafkaEventSource {
>
> fun withParameters(parameters: ParameterTool): KafkaSource {
> val schemaRegistryUrl = parameters.getRequired("schema.registry.url")
>
> val builder = KafkaSource.builder()
> .setBootstrapServers(parameters.getRequired("bootstrap.servers"))
> .setGroupId(parameters.getRequired("group.id"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setProperty("schema.registry.url", schemaRegistryUrl)
> .setTopics(parameters.getRequired("topic"))
> .setDeserializer(EventDeserializer(schemaRegistryUrl))
>
> if (parameters.getBoolean("bounded", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
>
> return builder.build()
> }
> }
>
> I can verify that the generated source has it's boundedness set properly
> and all of the configuration options are correct.
>
> My test itself is fairly simple and can be broken down as follows:
>
>1. Inject records into a Kafka Topic
>2. Initialize my Flink job using all of my testing parameters
>3. Apply my assertion (in this case verifying that a JdbcSink wrote to
>a specific database)
>
> @Test
> fun `Example `(){
> // Arrange
> val events = getTestEvents()
> sendToKafka(events, parameters)
>
> // Act
> EntityIdentificationJob.run(parameters)
>
> // Assert
> val users = queryCount("SELECT * FROM users", connection)
> assertEquals(1, users)
> }
>
> Where my job itself is broken down further and reads from the source,
> performs a process function into multiple side outputs and writes each of
> them to a distinct JdbcSink based on the type:
>
> @JvmStatic
> fun main(args: Array) {
> val parameters = loadParams(args)
> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
> // Read from Kafka
> val entities = stream
>.fromSource(KafkaEventSource.withParameters(parameters), 
> WatermarkStrategy.noWatermarks(), "kafka")
>.process(IdentifyEntitiesFunction())
>
> // Write out each tag to its respective sink
> for (entityType in EntityTypes.all) {
> entities
> .getSideOutput(entityType)
> .addSink(PostgresEnt

Re: Dynamic JDBC Sink Support

2021-03-06 Thread Maciej Obuchowski
Hey Rion,

I had exactly the same problem and implemented this functionality in
my Flink fork with XA sink taken from the development branch.
As I see that it's not only my problem, I've created a Jira task for
it - FLINK-21643 - and will provide draft PR for it.

@David - for traditional relational databases even a relatively small
number of connections can be unreasonable here.

Thanks,
Maciej

pt., 5 mar 2021 o 21:55 David Anderson  napisał(a):
>
> Rion,
>
> A given JdbcSink can only write to one table, but if the number of tables 
> involved isn't unreasonable, you could use a separate sink for each table, 
> and use side outputs [1] from a process function to steer each record to the 
> appropriate sink.
>
> I suggest you avoid trying to implement a sink.
>
> In general, custom sinks need to implement their own checkpointing, though 
> there is a generic two phase commit sink you can use as a starting point for 
> implementing a transactional sink. FYI, the JDBC sink has been reworked for 
> 1.13 to include exactly-once guarantees based on the XA standard [2].
>
> Regards,
> David
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink
>
> On Fri, Mar 5, 2021 at 7:34 PM Rion Williams  wrote:
>>
>> Hi all,
>>
>> I’ve been playing around with a proof-of-concept application with Flink to 
>> assist a colleague of mine. The application is fairly simple (take in a 
>> single input and identify various attributes about it) with the goal of 
>> outputting those to separate tables in Postgres:
>>
>> object AttributeIdentificationJob {
>> @JvmStatic
>> fun main(args: Array) {
>> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>
>> stream
>> .addSource(ReadFromKafka())
>> .process(IdentifyAttributesFunction())
>> .addSink(DynamicJdbcHere())
>>
>> // execute program
>> stream.execute("Attribute Identification")
>> }
>> }
>>
>> Considering my attributes may be of varying types (all implementing an 
>> Attribute interface), I don't know if the existing JdbcSink functionality or 
>> some variant of it (i.e. one of the dynamic ones that I see listed) could 
>> handle this functionality. Essentially for a given "bundle" of records, I'd 
>> need to ensure that each respective type of attribute was upserted into its 
>> corresponding table within a Postgres database.
>>
>> Is that something that the connector can handle on it's own? Or would I need 
>> to implement my own RichSinkFunction> that could 
>> handle opening a connection to Postgres and dynamically generating the 
>> appropriate UPSERT statements to handle sending the records? As a follow up 
>> to that, if I did need to write my own RichSinkFunction, would I need to 
>> implement my own checkmarking for resilience purposes or does that come 
>> along for the ride for RichSinkFunctions?
>>
>> Any insight or approaches would be welcome!
>>
>> Thanks,
>>
>> Rion


Re: Jackson object serialisations

2021-02-24 Thread Maciej Obuchowski
Hey Lasse,
I've had a similar case, albeit with Avro. I was reading from multiple
Kafka topics, which all had different objects and did some metadata
driven operations on them.
I could not go with any concrete predefined types for them, because
there were hundreds of different object types.

My solution was to serialize the object itself manually as byte[] and
deserialize it manually in operator.
You can do it the same way using something like
objectMapper.writeValueAsBytes and transfer data as Tuple2.

Overall, Flink does not support "dynamic" data types very well.

Regards,
Maciej

śr., 24 lut 2021 o 17:08 Lasse Nedergaard
 napisał(a):
>
> Hi
>
> I’m looking for advice for the best and simplest solution to handle JSON in 
> Flink.
>
> Our system is data driven and based on JSON. As the structure isn’t static 
> mapping it to POJO isn’t an option I therefore transfer ObjectNode and / or 
> ArrayNode between operators either in Tuples
> Tuple2 or as attributes in POJO’s.
>
> Flink doesn’t know about Jackson objects and therefore fail back to Kryo
>
> I see two options.
> 1. Add kryo serialisation objects for all the Jackson types we use and 
> register them.
> 2. Add Jackson objects as Flink types.
>
> I guess option 2 perform best, but it require an annotation for the classes 
> and I can’t do that for 3. Party objects. One workaround could be to create 
> my own objects that extends the Jackson objects and use them between 
> operators.
>
> I can’t be the first to solve this problem so I like to hear what the 
> community suggests.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>


Re: Object and Integer size in RocksDB ValueState

2021-02-24 Thread Maciej Obuchowski
Thanks Roman, that's exactly what I needed.

śr., 24 lut 2021 o 14:37 Roman Khachatryan  napisał(a):
>
> Thanks for the clarification.
>
> RocksDB stores whatever value Flink passes to it after serialization.
> The value is passed as an array of bytes so the minimum is single byte.
> Integer would require 4 bytes, Object - 1 or 2 depending on the serializer 
> (Pojo or Kryo), and boolean just 1 byte.
> Besides that, boolean serialization is apparently faster.
>
> Sizes in memory, on disk and of snapshot are all affected proportionally.
>
> You are right regarding Flink compression settings will not have any impact 
> with incremental checkpoints.
>
> Regards,
> Roman
>
>
> On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski 
>  wrote:
>>
>> Hey. Let me send simplified example, because I don't think this
>> "(given that the actual stored objects (integers) are the same)" is
>> true - I'm just storing object as a placeholder:
>>
>> public class DeduplicationProcessFunction extends
>> KeyedProcessFunction implements CheckpointedFunction {
>>
>> private transient ValueState processedState;
>>
>> public DeduplicationProcessFunction() { }
>>
>> @Override
>> public void snapshotState(FunctionSnapshotContext context) throws
>> Exception { }
>>
>> @Override
>> public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>> val descriptor = new ValueStateDescriptor<>("processed",
>> TypeInformation.of(Object.class));
>> processedState = context.getKeyedStateStore().getState(descriptor);
>> }
>>
>> @Override
>> public void processElement(IN value, Context ctx, Collector
>> out) throws Exception {
>> val processed = processedState.value();
>> if (processed == null) {
>> processedState.update(new Object());
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>>
>> Basically, I'm not sure what rocksdb stores in this case. I'm sure
>> that it needs to store key, which is 32byte sha key in this case.
>> What's the value? Is it the 16 bytes that Java requires in-memory? If
>> I'll change my ValueState to integer, and provide additional value
>> there, will it require more storage space? Also, to respond to your
>> point about compression, we're using incremental checkpoints, so I
>> don't think anything will change as per docs. I'm not only interested
>> in snapshot size, but also size of current, in memory and local disk
>> state.
>>
>> Thanks,
>> Maciej
>>
>>
>>
>> wt., 23 lut 2021 o 17:53 Roman Khachatryan  napisał(a):
>> >
>> > Hi Maciej,
>> >
>> > If I understand correctly, you're asking whether ValueState parameterized 
>> > with Object has the same size as the one with Integer (given that the 
>> > actual stored objects (integers) are the same).
>> > With RocksDB, any state object is serialized first and only then it is 
>> > stored in MemTable or in an SST file. So it doesn't matter as long as the 
>> > same serializer is used.
>> >
>> > You probably should try enabling compression if you didn't already: 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski 
>> >  wrote:
>> >>
>> >> Hey.
>> >>
>> >> We have deduplication job that has a large amount of keyed ValueState. We 
>> >> want to decrease state size as much as possible, so we're using 
>> >> ValueState as it's smallest possible Java non-primitive. However, 
>> >> as per https://www.baeldung.com/java-size-of-object (and my measurements) 
>> >> Java Integer has the same memory size as Object due to padding.
>> >> Will this still be true with RocksDB state? Can we put Integer in state 
>> >> without increasing state size?
>> >>
>> >> Thanks, Maciej


Re: Object and Integer size in RocksDB ValueState

2021-02-24 Thread Maciej Obuchowski
Hey. Let me send simplified example, because I don't think this
"(given that the actual stored objects (integers) are the same)" is
true - I'm just storing object as a placeholder:

public class DeduplicationProcessFunction extends
KeyedProcessFunction implements CheckpointedFunction {

private transient ValueState processedState;

public DeduplicationProcessFunction() { }

@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception { }

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
val descriptor = new ValueStateDescriptor<>("processed",
TypeInformation.of(Object.class));
processedState = context.getKeyedStateStore().getState(descriptor);
}

@Override
public void processElement(IN value, Context ctx, Collector
out) throws Exception {
val processed = processedState.value();
if (processed == null) {
processedState.update(new Object());
out.collect(value);
}
}
}



Basically, I'm not sure what rocksdb stores in this case. I'm sure
that it needs to store key, which is 32byte sha key in this case.
What's the value? Is it the 16 bytes that Java requires in-memory? If
I'll change my ValueState to integer, and provide additional value
there, will it require more storage space? Also, to respond to your
point about compression, we're using incremental checkpoints, so I
don't think anything will change as per docs. I'm not only interested
in snapshot size, but also size of current, in memory and local disk
state.

Thanks,
Maciej



wt., 23 lut 2021 o 17:53 Roman Khachatryan  napisał(a):
>
> Hi Maciej,
>
> If I understand correctly, you're asking whether ValueState parameterized 
> with Object has the same size as the one with Integer (given that the actual 
> stored objects (integers) are the same).
> With RocksDB, any state object is serialized first and only then it is stored 
> in MemTable or in an SST file. So it doesn't matter as long as the same 
> serializer is used.
>
> You probably should try enabling compression if you didn't already: 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski 
>  wrote:
>>
>> Hey.
>>
>> We have deduplication job that has a large amount of keyed ValueState. We 
>> want to decrease state size as much as possible, so we're using 
>> ValueState as it's smallest possible Java non-primitive. However, as 
>> per https://www.baeldung.com/java-size-of-object (and my measurements) Java 
>> Integer has the same memory size as Object due to padding.
>> Will this still be true with RocksDB state? Can we put Integer in state 
>> without increasing state size?
>>
>> Thanks, Maciej


Object and Integer size in RocksDB ValueState

2021-02-23 Thread Maciej Obuchowski
Hey.

We have deduplication job that has a large amount of keyed ValueState. We
want to decrease state size as much as possible, so we're using
ValueState as it's smallest possible Java non-primitive. However,
as per https://www.baeldung.com/java-size-of-object (and my measurements)
Java Integer has the same memory size as Object due to padding.
Will this still be true with RocksDB state? Can we put Integer in state
without increasing state size?

Thanks, Maciej