Re: Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-21 Thread Asimansu Bera
Hello Sachin,

Typically, Cloud VMs are ephemeral, meaning that if the EMR cluster goes
down or VMs are required to be shut down for security updates or due to
faults, new VMs will be added to the cluster. As a result, any data stored
in the local file system, such as file://tmp, would be lost. To ensure data
persistence and prevent loss of checkpoint or savepoint data for recovery,
it is advisable to store such data in a persistent storage solution like
HDFS or S3.

Generally, EMR based Hadoop NN runs on 8020 port. You may find the NN IP
details from EMR service.

Hope this helps.

-A


On Thu, Mar 21, 2024 at 10:54 PM Sachin Mittal  wrote:

> Hi,
> We are using AWS EMR where we can submit our flink jobs to a long running
> flink cluster on Yarn.
>
> We wanted to configure RocksDBStateBackend as our state backend to store
> our checkpoints.
>
> So we have configured following properties in our flink-conf.yaml
>
>- state.backend.type: rocksdb
>- state.checkpoints.dir: file:///tmp
>- state.backend.incremental: true
>
>
> My question here is regarding the checkpoint location: what is the
> difference between the location if it is a local filesystem vs a hadoop
> distributed file system (hdfs).
>
> What advantages we get if we use:
>
> *state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
> vs
> *state.checkpoints.dir*: file:///tmp
>
> Also if we decide to use HDFS then from where we can get the value for
> *namenode-host:port*
> given we are running Flink on an EMR.
>
> Thanks
> Sachin
>
>
>


Understanding RocksDBStateBackend in Flink on Yarn on AWS EMR

2024-03-21 Thread Sachin Mittal
Hi,
We are using AWS EMR where we can submit our flink jobs to a long running
flink cluster on Yarn.

We wanted to configure RocksDBStateBackend as our state backend to store
our checkpoints.

So we have configured following properties in our flink-conf.yaml

   - state.backend.type: rocksdb
   - state.checkpoints.dir: file:///tmp
   - state.backend.incremental: true


My question here is regarding the checkpoint location: what is the
difference between the location if it is a local filesystem vs a hadoop
distributed file system (hdfs).

What advantages we get if we use:

*state.checkpoints.dir*: hdfs://namenode-host:port/flink-checkpoints
vs
*state.checkpoints.dir*: file:///tmp

Also if we decide to use HDFS then from where we can get the value for
*namenode-host:port*
given we are running Flink on an EMR.

Thanks
Sachin


Re: Re: Read data from elasticsearch using Java flink

2024-03-21 Thread Fidea Lidea
Hi Xuyang

I am new to Flink & I don't know how to implement this dependency into the
code.
Can you please share some examples so that I can refer those.

Thanks
Nida


On Fri, Mar 22, 2024 at 7:02 AM Xuyang  wrote:

> Hi, Nida.
> Can you explain more details about "unable to use it." ? Did you get an
> exception after using it?
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-03-21 21:14:53,"Fidea Lidea"  写道:
>
> Thank you Xuyang.
> I added the above flink-connector-elasticsearch dependency in my project.
> But I am unable to use it.
> Can you please share a few sample codes which are using this dependency?
>
> Thanks
> Nida
>
> On Tue, Mar 12, 2024 at 5:37 PM Xuyang  wrote:
>
>> Hi, Fidea.
>> Currently, elasticsearch is not supported to be used as a source. You can
>> see the jira[1] for more details.
>> You can also cherry pick this pr[2] to your own branch and build a custom
>> elasticsearch connector to use it directly.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-25568
>> [2] https://github.com/apache/flink-connector-elasticsearch/pull/62
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2024-03-12 18:28:46, "Fidea Lidea"  wrote:
>>
>> Hi ,
>>
>> I am trying to read data from elasticsearch & store in a stream.
>> Could you please share a few examples to *read*/get all data from
>> Elasticsearch using java.
>>
>>
>> Thanks,
>>
>>
>>
>>


Re:Re: Read data from elasticsearch using Java flink

2024-03-21 Thread Xuyang
Hi, Nida.
Can you explain more details about "unable to use it." ? Did you get an 
exception after using it?




--

Best!
Xuyang




在 2024-03-21 21:14:53,"Fidea Lidea"  写道:

Thank you Xuyang.
I added the above flink-connector-elasticsearch dependency in my project.
But I am unable to use it.
Can you please share a few sample codes which are using this dependency?

Thanks 
Nida


On Tue, Mar 12, 2024 at 5:37 PM Xuyang  wrote:

Hi, Fidea.
Currently, elasticsearch is not supported to be used as a source. You can see 
the jira[1] for more details.
You can also cherry pick this pr[2] to your own branch and build a custom 
elasticsearch connector to use it directly.


[1] https://issues.apache.org/jira/browse/FLINK-25568
[2] https://github.com/apache/flink-connector-elasticsearch/pull/62



--

Best!
Xuyang




At 2024-03-12 18:28:46, "Fidea Lidea"  wrote:

Hi ,

I am trying to read data from elasticsearch & store in a stream.
Could you please share a few examples to read/get all data from Elasticsearch 
using java.



Thanks,

 



Re: Global connection open and close

2024-03-21 Thread Péter Váry
Hi Jacob,

Flink jobs, tasks typically run on multiple nodes/servers. This means that
it is not possible to have a connection shared on job level.

You can read about the architecture in more detail in the docs. [1]

I hope this helps,
Péter

[1] -
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/flink-architecture/

On Thu, Mar 21, 2024, 13:10 Jacob Rollings 
wrote:

> Hello,
>
> Is there a way in Flink to instantiate or open connections (to cache/db)
> at global level, so that it can be reused across many process functions
> rather than doing it in each operator's open()?Along with opening, also
> wanted to know if there is a way to close them at job level stop, such that
> they are closed at the very end after each operator close() method is
> complete. Basically the idea is to maintain a single instance at global
> level and close its session as a last step after each opertor close is
> complete.
>
>
> Regards,
> JB
>


Re: Read data from elasticsearch using Java flink

2024-03-21 Thread Fidea Lidea
Thank you Xuyang.
I added the above flink-connector-elasticsearch dependency in my project.
But I am unable to use it.
Can you please share a few sample codes which are using this dependency?

Thanks
Nida

On Tue, Mar 12, 2024 at 5:37 PM Xuyang  wrote:

> Hi, Fidea.
> Currently, elasticsearch is not supported to be used as a source. You can
> see the jira[1] for more details.
> You can also cherry pick this pr[2] to your own branch and build a custom
> elasticsearch connector to use it directly.
>
> [1] https://issues.apache.org/jira/browse/FLINK-25568
> [2] https://github.com/apache/flink-connector-elasticsearch/pull/62
>
> --
> Best!
> Xuyang
>
>
> At 2024-03-12 18:28:46, "Fidea Lidea"  wrote:
>
> Hi ,
>
> I am trying to read data from elasticsearch & store in a stream.
> Could you please share a few examples to *read*/get all data from
> Elasticsearch using java.
>
>
> Thanks,
>
>
>
>


Global connection open and close

2024-03-21 Thread Jacob Rollings
Hello,

Is there a way in Flink to instantiate or open connections (to cache/db) at
global level, so that it can be reused across many process functions rather
than doing it in each operator's open()?Along with opening, also wanted to
know if there is a way to close them at job level stop, such that they are
closed at the very end after each operator close() method is complete.
Basically the idea is to maintain a single instance at global level and
close its session as a last step after each opertor close is complete.


Regards,
JB


Re: can we use Scan Newly Added Tables without restarting the existing job ?

2024-03-21 Thread 3pang zhu
Ok, I see.

Hang Ruan  于2024年3月20日周三 16:08写道:

> Hi, 3pang zhu.
>
> This `Scan Newly added tables` feature requires restarting the job from
> the savepoint. We cannot add new tables to the running job without
> restarting by now.
>
> Best,
> Hang
>
> 3pang zhu  于2024年3月20日周三 15:22写道:
>
>> this link has describe the usage for [Scan Newly Added Tables]
>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables
>> .
>> if we can use if without restarting job. i have try this patch, use a
>> schedule task in MysqlSnapshotSplitAssigner#open(), when added table more
>> than twice, it occur this issue
>> https://github.com/apache/flink-cdc/issues/2282
>>
>>
>>  
>> .../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
>>  | 26 +++---
>>  
>> .../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
>>  |  5 +++--
>>  2 files changed, 26 insertions(+), 5 deletions(-)
>>
>> diff --cc
>> flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav
>> index 0536a262,0536a262..d52acc26
>> ---
>> a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
>> +++
>> b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
>> @@@ -56,7 -56,7 +56,9 @@@ import java.util.Set
>>   import java.util.concurrent.CopyOnWriteArrayList;
>>   import java.util.concurrent.ExecutorService;
>>   import java.util.concurrent.Executors;
>> ++import java.util.concurrent.ScheduledExecutorService;
>>   import java.util.concurrent.ThreadFactory;
>> ++import java.util.concurrent.TimeUnit;
>>   import java.util.stream.Collectors;
>>
>>   import static
>> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
>> @@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne
>>   private MySqlChunkSplitter chunkSplitter;
>>   private boolean isTableIdCaseSensitive;
>>   private ExecutorService executor;
>> ++private ScheduledExecutorService scheduledExecutor;
>>
>>   @Nullable private Long checkpointIdToFinish;
>>
>> @@@ -179,12 -179,12 +182,24 @@@
>>   @Override
>>   public void open() {
>>   chunkSplitter.open();
>> --discoveryCaptureTables();
>> --captureNewlyAddedTables();
>> --startAsynchronouslySplit();
>> ++if (scheduledExecutor == null) {
>> ++ThreadFactory threadFactory =
>> ++new
>> ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
>> ++this.scheduledExecutor =
>> Executors.newSingleThreadScheduledExecutor(threadFactory);
>> ++}
>> ++scheduledExecutor.scheduleAtFixedRate(
>> ++() -> {
>> ++discoveryCaptureTables();
>> ++captureNewlyAddedTables();
>> ++startAsynchronouslySplit();
>> ++},
>> ++0,
>> ++1,
>> ++TimeUnit.MINUTES);
>>   }
>>
>>   private void discoveryCaptureTables() {
>> ++LOG.info("start discovery capture tables");
>>   // discovery the tables lazily
>>   if (needToDiscoveryTables()) {
>>   long start = System.currentTimeMillis();
>> @@@ -216,6 -216,6 +231,7 @@@
>>   }
>>
>>   private void captureNewlyAddedTables() {
>> ++LOG.info("start to capture newly added tables");
>>   if (sourceConfig.isScanNewlyAddedTableEnabled()) {
>>   // check whether we got newly added tables
>>   try (JdbcConnection jdbc =
>> openJdbcConnection(sourceConfig)) {
>> @@@ -282,6 -282,6 +298,7 @@@
>>   }
>>
>>   private void startAsynchronouslySplit() {
>> ++LOG.info("start asynchronously split");
>>   if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty())
>> {
>>   if (executor == null) {
>>   ThreadFactory threadFactory =
>> @@@ -497,6 -497,6 +514,9 @@@
>>   if (executor != null) {
>>   executor.shutdown();
>>   }
>> ++if (scheduledExecutor != null) {
>> ++scheduledExecutor.shutdown();
>> ++}
>>   }
>>
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-21 Thread gongzhongqiang
Congrattulations! Thanks for the great work!


Best,
Zhongqiang Gong

Leonard Xu  于2024年3月20日周三 21:36写道:

> Hi devs and users,
>
> We are thrilled to announce that the donation of Flink CDC as a
> sub-project of Apache Flink has completed. We invite you to explore the new
> resources available:
>
> - GitHub Repository: https://github.com/apache/flink-cdc
> - Flink CDC Documentation:
> https://nightlies.apache.org/flink/flink-cdc-docs-stable
>
> After Flink community accepted this donation[1], we have completed
> software copyright signing, code repo migration, code cleanup, website
> migration, CI migration and github issues migration etc.
> Here I am particularly grateful to Hang Ruan, Zhongqaing Gong, Qingsheng
> Ren, Jiabao Sun, LvYanquan, loserwang1024 and other contributors for their
> contributions and help during this process!
>
>
> For all previous contributors: The contribution process has slightly
> changed to align with the main Flink project. To report bugs or suggest new
> features, please open tickets
> Apache Jira (https://issues.apache.org/jira).  Note that we will no
> longer accept GitHub issues for these purposes.
>
>
> Welcome to explore the new repository and documentation. Your feedback and
> contributions are invaluable as we continue to improve Flink CDC.
>
> Thanks everyone for your support and happy exploring Flink CDC!
>
> Best,
> Leonard
> [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>
>