Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-09 Thread Yun Gao
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job 
restarted
these transaction should be able to be committed again. 

If the external system meet such conditions, to implement an exactly-once sink, 
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before 
releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. 
It is much 
similar to the option b) and are supported since 1.13. It would still be 
supported in the
next several releases and  it also be able to be migrated to the option b) 
easily. 

Best,
Yun



 --Original Mail --
Sender:Fuyao Li 
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user 
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink 
to achieve EXACTLY_ONCE sink?

Hello Community,
I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.

I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
If it is possible for question 1, then I need to implement a custom sink for 
this. Which option should I use?
Option 1:TwoPhaseCommitSinkFunction
Option 2:StatefulSink + TwoPhaseCommittingSink
The legacy FlinkKafkaProducer seems to be using option (a)  This will be 
removed from Flink in the future. The newKafkaSink seems to be using option 
(b). Based on the comment in the code, it seems option (a) is recommended, 
which one should I use? Please suggest if I am missing anything, or any other 
better solutions in my case?
Thanks,
Fuyao


Flink SQL kafka debezium CDC and postgreSQL

2022-02-09 Thread Francis Conroy
Hi all,

I'm using flink 1.13.5 (as I was originally using the ververica Flink CDC
connector) and am trying to understand something.
I'm just using the Flink SQL CLI at this stage to verify that I can stream
a PostgreSQL table into Flink SQL to compute a continuous materialised
view. I was inspecting the kafka messages that were being published by
debezium originally and noticed that they were incredibly verbose including
all new/old values, this was because I was using REPLICA IDENTITY FULL on
the source tables.

I've now created unique indexes using the primary keys and REPLICA IDENTITY
USING INDEX [INDEX]. I understand that the changed rows can now be matched
using their index row, meaning we don't need to send the before contents of
the row to identify it. When running my simple select * query on the table
I get the following error:

Flink SQL> select * from devices_device;

*[ERROR] Could not execute SQL statement.
Reason:java.lang.IllegalStateException: The "before" field of UPDATE
message is null, if you are using Debezium Postgres Connector, please check
the Postgres table has been set REPLICA IDENTITY to FULL level.*

My table definition:

CREATE TABLE devices_device
(
id   INT  NOT NULL,
legacy_device_type_id   INT,
endpoint_id  INT  NOT NULL,
logical_device_idINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'django-db.public.devices_device',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'debezium-json'
);

The message makes perfect sense, but I can't quite understand why I can't
use REPLICA IDENTITY USING INDEX? Does anyone know if this was a decision
that was made at some point or it's not technically possible for some
reason?

Note: I will change to using REPLICA IDENTITY FULL so I can continue
working for now but It's not something I want to put into production.

Thanks for your consideration!

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-09 Thread Fuyao Li
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink 
for this. Which option should I use?
 *   Option 1: 
TwoPhaseCommitSinkFunction
 *   Option 2: 
StatefulSink
 + 
TwoPhaseCommittingSink

The legacy FlinkKafkaProducer seems to be using option (a)  This will be 
removed from Flink in the future. The new 
KafkaSink
 seems to be using option (b). Based on the comment in the code, it seems 
option (a) is recommended, which one should I use? Please suggest if I am 
missing anything, or any other better solutions in my case?


Thanks,
Fuyao







unpredictable behaviour on KafkaSource deserialisation error

2022-02-09 Thread Frank Dekervel

Hello,

When trying to reproduce a bug, we made a DeserialisationSchema that 
throws an exception when a malformed message comes in.
Then, we sent a malformed message together with a number of well formed 
messages to see what happens.


valsource= KafkaSource.builder[OurMessage]()
.setValueOnlyDeserializer(newBuggySchema(tolerateInvalidIncomingJSON))
.setBootstrapServers(bootstrap_servers)
.setTopics("mqtt")
.setGroupId("flink")
.setProperty("isolation.level","read_committed")
.setStartingOffsets(OffsetsInitializer.latest())
.setProperty("transaction.timeout.ms", "90")
.setProperty("partition.discovery.interval.ms", "6")
.build;

to simulate our slow API we did this:

valpusher= alarms.map(x => {Thread.sleep(8000); x.toString()})
pusher.sinkTo(buildAtLeastOnceStringSink(bootstrap_servers, p, 
"alm_log")).uid("almlog").name("almlog")


Then, we injected a lot of messages, and also one invalid message. We 
repeated this test multiple times. And this is where things got weird: i 
would expect the job to fail, restart and fail again (since, upon 
restarting, it should reprocess the same invalid message).
Sometimes this indeed happens, but sometimes we get the exception only 
once and then the application remains in "running" state without 
continuous crashing and recovery. I think this is somehow related to the 
earlier issue we saw with the duplicated messages.


However, that's not what happens always: sometimes the job fails, 
restarts and then keeps running. And sometimes, it goes into my 
(expected) restart loop. Until i understand what's going on, i disabled 
flink task failure recovery, and i rely on the flink k8s operator to 
restart the job on failure. But i'd like to understand what's happening.


As a side note: in our pipeline we switch a couple of times between the 
table API and the datastream API. could that influence the failure zone 
determination?


Thanks!

Greetings,
Frank



Re: Changing StateBackend for Statefun Harness E2E tests

2022-02-09 Thread Igal Shilman
Hi Christopher,
Great to hear you've solved it, and thanks for sharing your findings with
the community!

Indeed RocksDB is a separate component that has to be added as a
dependency.

On Wed, Feb 9, 2022 at 3:55 PM Christopher Gustafson  wrote:

> Solved it, and posting here in case anyone run into the same issue!
>
>
> Since the Harness uses StreamExecutionEnvironment to set the flink
> configurations, you have to set the state backend explicitly, as described
> here:
>
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L59
>
>
> All the best
> --
> *Från:* Christopher Gustafson
> *Skickat:* den 9 februari 2022 13:34:05
> *Till:* user@flink.apache.org
> *Ämne:* Changing StateBackend for Statefun Harness E2E tests
>
>
> Hi everyone,
>
>
> I am looking into the code of StateFun, trying to understand how it works.
> I was trying to run the Harness E2E in my IDE, and tried to change the
> StateBackend to rocksdb, at which point I got an error saying it wasn't
> found. My first question then becomes, why is this? Shouldn't the regular
> Flink including the rocksdb state backend be included as a dependency?
> Since I am a bit confused about the layout, if someone could provide a
> description of how the StateFun project relates to the Flink codebase, and
> how one can work effectively with these two in an IDE would be very
> appreciated.
>
>
> Thanks,
>
> Christopher
>


Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-09 Thread saravana...@gmail.com
Is there any way to identify the last message inside RichFunction in BATCH
mode ?



On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com 
wrote:

> I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
> DataStream api. mapPartition is not available in Flink DataStream.
> *Current Code using Flink 1.12.x DataSet :*
>
> dataset
> .
> .mapPartition(new SomeMapParitionFn())
> .
>
> public static class SomeMapPartitionFn extends 
> RichMapPartitionFunction {
>
> @Override
> public void mapPartition(Iterable records, 
> Collector out) throws Exception {
> for (InputModel record : records) {
> /*
> do some operation
>  */
> if (/* some condition based on processing *MULTIPLE* records */) 
> {*out.collect(...); // Conditional collect
> ---> (1)*}
> }
>
> // At the end of the data, collect*out.collect(...);   // 
> Collect processed data   ---> (2) *}
> }
>
>
>-
>
>(1) - Collector.collect invoked based on some condition after
>processing few records
>-
>
>(2) - Collector.collect invoked at the end of data
>
>Initially we thought of using flatMap instead of mapPartition, but the
>collector is not available in close function.
>
>https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>case of chained drivers
>
> How to implement this in Flink 1.14.x DataStream? Please advise...
>
> *Note*: Our application works with only finite set of data (Batch Mode)
>
>
>


Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-09 Thread saravana...@gmail.com
I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
DataStream api. mapPartition is not available in Flink DataStream.
*Current Code using Flink 1.12.x DataSet :*

dataset
.
.mapPartition(new SomeMapParitionFn())
.

public static class SomeMapPartitionFn extends
RichMapPartitionFunction {

@Override
public void mapPartition(Iterable records,
Collector out) throws Exception {
for (InputModel record : records) {
/*
do some operation
 */
if (/* some condition based on processing *MULTIPLE*
records */) {*out.collect(...); // Conditional collect
   ---> (1)*}
}

// At the end of the data, collect*out.collect(...);
// Collect processed data   ---> (2) *}
}


   -

   (1) - Collector.collect invoked based on some condition after processing
   few records
   -

   (2) - Collector.collect invoked at the end of data

   Initially we thought of using flatMap instead of mapPartition, but the
   collector is not available in close function.

   https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
   case of chained drivers

How to implement this in Flink 1.14.x DataStream? Please advise...

*Note*: Our application works with only finite set of data (Batch Mode)


Question about both using managed memory with RocksDB and tuning other options

2022-02-09 Thread LM Kang
Hi community,


According to the docs of Flink and RocksDB, if we set 
`state.backend.rocksdb.memory.managed` option to `true`, the memory budget of 
memtable and block cache will be controlled by WriteBufferManager and Cache, 
according to the given ratios.

Based on this premise, how will the other related parameters (e.g. block size / 
block cache size / write buffer count / write buffer size) be set? Do they just 
keep their own default values, or actively set new values by WriteBufferManager?

Furthermore, if we arbitrarily set these parameters or configure 
PredefinedOptions (e.g. in SPINNING_DISK_OPTIMIZED_HIGH_MEM, the block cache 
size is 256MB), will these parameters take effect? If they do, how can we 
ensure that they agree with WriteBufferManager?

Many thanks.



huge number of duplicate numbers after experiencing a crash loop in deserializer

2022-02-09 Thread Frank Dekervel

Hello,

Due to a malformed message in our input queue (kafka), our 
DeserialisationSchema threw an exception, making our flink application 
crash. Since our application was configured to restart, it restarted, 
only to reprocess the same malformed message and crash again.


This happened for a while until we fixed the job and made the 
DeserialisationSchema return null on malformed messages. After that, we 
restarted the job from the last successful savepoint (just before the 
malformed message) and we got a huge number of duplicate messages 
generated from our processors (seems one message for every time flink 
restarted).


In addition to this, one of our map functions had side effects and these 
side effects were also executed a huge number of times. We basically 
have this topology:


kafka source --> stateful process function with event time timers --> 
(slow) map function with side effect (post to external API) --> kafka 
sink (at least once)


We don't use the exactly once sink (instead we use at least once), 
because an occasional duplicate would not harm us and we need low 
latency. However, having massive number of duplicates is a problem.


So i'm trying to understand how checkpoints+savepoints really work and 
in what situation we could end up having a massive amount of duplicates. 
The only way i could think of is the following scenario:


 * the application starts up
 * the stateful process function treats some incoming messages from
   kafka and generates some outgoing messages
 * the slow map function starts processing these messages, and at the
   same time a checkpoint is saved (somehow without new kafka offsets ???)
 * the application crashes on the malformed input

then again:

 * application restarts
 * the stateful process function treats again the same incoming
   messages from kafka, generating exactly the same in flight messages
   again (we use deterministic IDs for these messages and we see the
   same ID being generated over and over).
 * a checkpoint is saved with more in flight messages, the map function
   is slow hence doesn't catch up
 * the application crashes again on the same input.

Are in flight messages stored in a checkpoint somehow ? Is the above 
scenario even possible (reading the design of flink i would think no, 
but then i have no other explanation). We had this once more in the past 
(then it was a crash in another branch of the same dataflow).


Greetings,
Frank





RichAsyncFunction + Cache or Map State?

2022-02-09 Thread Clayton Wohl
I have a RichAsyncFunction that does async queries to an external database.
I'm using a Guava cache within the Flink app. I'd like this Guava cache to
be serialized with the rest of Flink state in checkpoint/savepoints.
However, RichAsyncFunction doesn't support the state functionality at all.

There is one Guava cache for the entire Flink app which might make this
scenario simpler.

Is there a recommended way to handle this situation?

Also, the Flink MapState interface doesn't support check-and-set type
functionality and doesn't support lock-free concurrent use like
java.util.concurrent.ConcurrentMap and Guava's cache do. I need both of
these features for proper concurrent operation. So even if I could use
Flink MapState, that doesn't seem like a practical solution.


SV: Changing StateBackend for Statefun Harness E2E tests

2022-02-09 Thread Christopher Gustafson
Solved it, and posting here in case anyone run into the same issue!


Since the Harness uses StreamExecutionEnvironment to set the flink 
configurations, you have to set the state backend explicitly, as described here:
https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L59


All the best


Från: Christopher Gustafson
Skickat: den 9 februari 2022 13:34:05
Till: user@flink.apache.org
Ämne: Changing StateBackend for Statefun Harness E2E tests


Hi everyone,


I am looking into the code of StateFun, trying to understand how it works. I 
was trying to run the Harness E2E in my IDE, and tried to change the 
StateBackend to rocksdb, at which point I got an error saying it wasn't found. 
My first question then becomes, why is this? Shouldn't the regular Flink 
including the rocksdb state backend be included as a dependency? Since I am a 
bit confused about the layout, if someone could provide a description of how 
the StateFun project relates to the Flink codebase, and how one can work 
effectively with these two in an IDE would be very appreciated.


Thanks,

Christopher


Changing StateBackend for Statefun Harness E2E tests

2022-02-09 Thread Christopher Gustafson
Hi everyone,


I am looking into the code of StateFun, trying to understand how it works. I 
was trying to run the Harness E2E in my IDE, and tried to change the 
StateBackend to rocksdb, at which point I got an error saying it wasn't found. 
My first question then becomes, why is this? Shouldn't the regular Flink 
including the rocksdb state backend be included as a dependency? Since I am a 
bit confused about the layout, if someone could provide a description of how 
the StateFun project relates to the Flink codebase, and how one can work 
effectively with these two in an IDE would be very appreciated.


Thanks,

Christopher


Regarding delayed reading at source with larger checkpoint timeouts

2022-02-09 Thread Johny Rufus John
Hi Team,

I have a use case where in my Kafka Source, I need to wait for 2 hours
before handling a event. Currently, following is the plan, kindly let me
know if this would work without issues and any gotchas I need to be aware
of.

a) In Kafka consumer deserializer schema, look at the published timestamp
of the event, if it is sitting in the kafka queue for < 2 hours, sleep for
remaining amount
b) Increase checkpoint timeout to say 3 hours, so that checkpoints succeed.
c) This also implies offset commits will happen in about 2 hours for some
events when checkpointing succeeds, are there any internal configs that
would interfere with this.
d) Using the old FlinkKafkaConsumer (1.11) , any issues w.r.t using this
one.

Thanks for your advice.

Thanks,
Johny


Re: Flink's Data sources

2022-02-09 Thread Martijn Visser
Hi Mohan,

It's not clear for me what you're trying to ask for on the Flink User
mailing list. I don't recognize the table that you've included. Based on
previous emails you're asking questions to the Flink user mailing list on a
comparison between Flink and Kafka Connect. The Flink User mailing list
focuses on answering user related questions related to Flink only, not in
comparison with other tools.

There is an overview of Flink connectors for DataStreamp use cases [1] and
Table API use cases [2].

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/

On Wed, 9 Feb 2022 at 07:44, mohan radhakrishnan <
radhakrishnan.mo...@gmail.com> wrote:

> Hi,
>
> The source for Flink ( or even Kafka ) is a problem we find hard to solve.
> This data seems to indicate that the source could be MQ. Is there a need to
> pull from MQ to Hive and then write to Flink ? What can be the flow ?
>
> Kafka connect workers can issue JDBC queries and pull to Kafka. Is there
> an equivalent toolset for Flink ? Should we pull into kafka and pick up
> using Flink( Checkpointing using Kafka consumer offsets). ?
>
>
> Source of continuous data Kafka, File Systems, other message queues Strictly
> Kafka with Kafka Connect serving to address the data into, data out of
> Kafka problem
> Sink for results
> Kafka, other MQs, file system, analytical database, key/value stores,
> stream processor state, and other external systems
>
> Kafka, application state, operational database or any external system
>
> Thanks,
> Mohan
>


Re: JSONKeyValueDeserializationSchema cannot be converted to ObjectNode>

2022-02-09 Thread HG
Sorry to have bothered everyone.

This is the obvious solution:

.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(false)))


Regards Hans-Peter


Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan :

> Hi,
>
> setDeserializer() expects KafkaRecordDeserializationSchema;
> JSONKeyValueDeserializationSchema you provided is not compatible with
> it.
> You can convert it using [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema-
>
>
> Regards,
> Roman
>
> On Tue, Feb 8, 2022 at 5:43 PM HG  wrote:
> >
> > Hi all,
> >
> > When I build this code:
> >
> > KafkaSource source = KafkaSource.builder()
> > .setProperties(kafkaProps)
> > .setProperty("ssl.truststore.type",trustStoreType)
> > .setProperty("ssl.truststore.password",trustStorePassword)
> > .setProperty("ssl.truststore.location",trustStoreLocation)
> > .setProperty("security.protocol",securityProtocol)
> > .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
> > .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
> > .setGroupId(groupId)
> > .setTopics(kafkaInputTopic)
> > .setDeserializer(new JSONKeyValueDeserializationSchema(false))
> >
>  
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> > .build();
> >
> >
> > I get:
> > This error:
> >
> > error: incompatible types:
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
> cannot be converted to
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
> > .setDeserializer(new
> JSONKeyValueDeserializationSchema(false))
> >
> >
> > What am I doing wrong?
> > As per the documentation JSONKeyValueDeserializationSchema returns an
> ObjectNode.
> >
> > Regards Hans
> >
>


Re: JSONKeyValueDeserializationSchema cannot be converted to ObjectNode>

2022-02-09 Thread HG
Hi
Convert ??
How does that work?
Can you spare a couple of lines for that?

Regards Hans

Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan :

> Hi,
>
> setDeserializer() expects KafkaRecordDeserializationSchema;
> JSONKeyValueDeserializationSchema you provided is not compatible with
> it.
> You can convert it using [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema-
>
>
> Regards,
> Roman
>
> On Tue, Feb 8, 2022 at 5:43 PM HG  wrote:
> >
> > Hi all,
> >
> > When I build this code:
> >
> > KafkaSource source = KafkaSource.builder()
> > .setProperties(kafkaProps)
> > .setProperty("ssl.truststore.type",trustStoreType)
> > .setProperty("ssl.truststore.password",trustStorePassword)
> > .setProperty("ssl.truststore.location",trustStoreLocation)
> > .setProperty("security.protocol",securityProtocol)
> > .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
> > .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
> > .setGroupId(groupId)
> > .setTopics(kafkaInputTopic)
> > .setDeserializer(new JSONKeyValueDeserializationSchema(false))
> >
>  
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> > .build();
> >
> >
> > I get:
> > This error:
> >
> > error: incompatible types:
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
> cannot be converted to
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
> > .setDeserializer(new
> JSONKeyValueDeserializationSchema(false))
> >
> >
> > What am I doing wrong?
> > As per the documentation JSONKeyValueDeserializationSchema returns an
> ObjectNode.
> >
> > Regards Hans
> >
>