[jira] [Updated] (FLINK-35674) MySQL connector cause blocking when searching for binlog timestamps

2024-06-24 Thread Thorne (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thorne updated FLINK-35674:
---
Description: 
When a task is started by multiple mysql connector timestamp start mode at the 
same time, when searching for binlog timestamp, there will be task blocking 
problem, which may cause source to be unable to obtain data all the time. 

 

1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a 
task . For these four tables, I made a lot of binlogs,such as 10 million。

2、I try start it with timestamp mode and the products table could not get any 
records .

!FBA32597-8783-4678-B391-E450148C1B30.png|width=550,height=264!

3、I try start it with timestamp mode  ,but  the orders_copy table could not get 
any records

!BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png|width=557,height=230!

3、I debug  code and find some  problems
{code:java}
# Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils

private static String searchBinlogName(
BinaryLogClient client, long targetMs, List binlogFiles)
throws IOException, InterruptedException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;

while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
endIdx = mid - 1;
} else {
return binlogFiles.get(mid);
}
}

return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}

private static long getBinlogTimestamp(BinaryLogClient client, String 
binlogFile)
throws IOException, InterruptedException {

ArrayBlockingQueue binlogTimestamps = new ArrayBlockingQueue<>(1);
BinaryLogClient.EventListener eventListener =
event -> {
EventData data = event.getData();
if (data instanceof RotateEventData) {
// We skip RotateEventData because it does not contain the 
timestamp we are
// interested in.
return;
}

EventHeaderV4 header = event.getHeader();
long timestamp = header.getTimestamp();
if (timestamp > 0) {
binlogTimestamps.offer(timestamp);
try {
client.disconnect();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};

try {
client.registerEventListener(eventListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);

LOG.info("begin parse binlog: {}", binlogFile);
client.connect();
} finally {
client.unregisterEventListener(eventListener);
}
return binlogTimestamps.take();
}{code}
5、 the funciton binlogTimestamps.take() is blocking until the queue has records.

6、the binlogTimestamps queue is always  blocking and cannot  get any data.

  was:
When a task is started by multiple mysql connector timestamp start mode at the 
same time, when searching for binlog timestamp, there will be task blocking 
problem, which may cause source to be unable to obtain data all the time. 

 

1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a 
task . For these four tables, I made a lot of binlogs,such as 10 million。

2、I try start it with timestamp mode and the products table could not get any 
records .

!FBA32597-8783-4678-B391-E450148C1B30.png|width=550,height=264!

3、I try start it with timestamp mode  ,but  the orders_copy table could not get 
any records

!BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png|width=557,height=230!

3、I debug  code and find some  problems
{code:java}
# Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils

private static String searchBinlogName(
BinaryLogClient client, long targetMs, List binlogFiles)
throws IOException, InterruptedException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;

while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
endIdx = mid - 1;
} else {
return binlogFiles.get(mid);
}
}

return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}

private static long getBinlogTimestamp(BinaryLogClient client, String 
binlogFile)
throws IOException, InterruptedException {

ArrayBlockingQueue binlogTimestamps = new ArrayBlockingQueue<>(1);
BinaryLogClient.EventListener eventListener =
event -> {
EventData data 

[jira] [Updated] (FLINK-35674) MySQL connector cause blocking when searching for binlog timestamps

2024-06-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35674:
---
Labels: pull-request-available  (was: )

> MySQL connector cause blocking when searching for binlog timestamps
> ---
>
> Key: FLINK-35674
> URL: https://issues.apache.org/jira/browse/FLINK-35674
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
> Environment: flink-cdc-3.1.x
>Reporter: Thorne
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
> Attachments: A7AE0D63-365D-4572-B63D-96DF5F096BF9.png, 
> BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png, 
> FBA32597-8783-4678-B391-E450148C1B30.png
>
>
> When a task is started by multiple mysql connector timestamp start mode at 
> the same time, when searching for binlog timestamp, there will be task 
> blocking problem, which may cause source to be unable to obtain data all the 
> time. 
>  
> 1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a 
> task . For these four tables, I made a lot of binlogs,such as 10 million。
> 2、I try start it with timestamp mode and the products table could not get any 
> records .
> !FBA32597-8783-4678-B391-E450148C1B30.png|width=550,height=264!
> 3、I try start it with timestamp mode  ,but  the orders_copy table could not 
> get any records
> !BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png|width=557,height=230!
> 3、I debug  code and find some  problems
> {code:java}
> # Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils
> private static String searchBinlogName(
> BinaryLogClient client, long targetMs, List binlogFiles)
> throws IOException, InterruptedException {
> int startIdx = 0;
> int endIdx = binlogFiles.size() - 1;
> while (startIdx <= endIdx) {
> int mid = startIdx + (endIdx - startIdx) / 2;
> long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
> if (midTs < targetMs) {
> startIdx = mid + 1;
> } else if (targetMs < midTs) {
> endIdx = mid - 1;
> } else {
> return binlogFiles.get(mid);
> }
> }
> return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
> }
> private static long getBinlogTimestamp(BinaryLogClient client, String 
> binlogFile)
> throws IOException, InterruptedException {
> ArrayBlockingQueue binlogTimestamps = new ArrayBlockingQueue<>(1);
> BinaryLogClient.EventListener eventListener =
> event -> {
> EventData data = event.getData();
> if (data instanceof RotateEventData) {
> // We skip RotateEventData because it does not contain 
> the timestamp we are
> // interested in.
> return;
> }
> EventHeaderV4 header = event.getHeader();
> long timestamp = header.getTimestamp();
> if (timestamp > 0) {
> binlogTimestamps.offer(timestamp);
> try {
> client.disconnect();
> } catch (IOException e) {
> throw new RuntimeException(e);
> }
> }
> };
> try {
> client.registerEventListener(eventListener);
> client.setBinlogFilename(binlogFile);
> client.setBinlogPosition(0);
> LOG.info("begin parse binlog: {}", binlogFile);
> client.connect();
> } finally {
> client.unregisterEventListener(eventListener);
> }
> return binlogTimestamps.take();
> }{code}
> 5、 the funciton binlogTimestamps.take() is blocking until the queue has 
> records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35674) MySQL connector cause blocking when searching for binlog timestamps

2024-06-23 Thread Thorne (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thorne updated FLINK-35674:
---
Summary: MySQL connector cause blocking when searching for binlog 
timestamps  (was: MySQL connector may cause blocking when searching for binlog 
timestamps)

> MySQL connector cause blocking when searching for binlog timestamps
> ---
>
> Key: FLINK-35674
> URL: https://issues.apache.org/jira/browse/FLINK-35674
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
> Environment: flink-cdc-3.1.x
>Reporter: Thorne
>Priority: Blocker
> Fix For: cdc-3.2.0
>
> Attachments: A7AE0D63-365D-4572-B63D-96DF5F096BF9.png, 
> BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png, 
> FBA32597-8783-4678-B391-E450148C1B30.png
>
>
> When a task is started by multiple mysql connector timestamp start mode at 
> the same time, when searching for binlog timestamp, there will be task 
> blocking problem, which may cause source to be unable to obtain data all the 
> time. 
>  
> 1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a 
> task . For these four tables, I made a lot of binlogs,such as 10 million。
> 2、I try start it with timestamp mode and the products table could not get any 
> records .
> !FBA32597-8783-4678-B391-E450148C1B30.png|width=550,height=264!
> 3、I try start it with timestamp mode  ,but  the orders_copy table could not 
> get any records
> !BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png|width=557,height=230!
> 3、I debug  code and find some  problems
> {code:java}
> # Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils
> private static String searchBinlogName(
> BinaryLogClient client, long targetMs, List binlogFiles)
> throws IOException, InterruptedException {
> int startIdx = 0;
> int endIdx = binlogFiles.size() - 1;
> while (startIdx <= endIdx) {
> int mid = startIdx + (endIdx - startIdx) / 2;
> long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
> if (midTs < targetMs) {
> startIdx = mid + 1;
> } else if (targetMs < midTs) {
> endIdx = mid - 1;
> } else {
> return binlogFiles.get(mid);
> }
> }
> return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
> }
> private static long getBinlogTimestamp(BinaryLogClient client, String 
> binlogFile)
> throws IOException, InterruptedException {
> ArrayBlockingQueue binlogTimestamps = new ArrayBlockingQueue<>(1);
> BinaryLogClient.EventListener eventListener =
> event -> {
> EventData data = event.getData();
> if (data instanceof RotateEventData) {
> // We skip RotateEventData because it does not contain 
> the timestamp we are
> // interested in.
> return;
> }
> EventHeaderV4 header = event.getHeader();
> long timestamp = header.getTimestamp();
> if (timestamp > 0) {
> binlogTimestamps.offer(timestamp);
> try {
> client.disconnect();
> } catch (IOException e) {
> throw new RuntimeException(e);
> }
> }
> };
> try {
> client.registerEventListener(eventListener);
> client.setBinlogFilename(binlogFile);
> client.setBinlogPosition(0);
> LOG.info("begin parse binlog: {}", binlogFile);
> client.connect();
> } finally {
> client.unregisterEventListener(eventListener);
> }
> return binlogTimestamps.take();
> }{code}
> 5、 the funciton binlogTimestamps.take() is blocking until the queue has 
> records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)