Flink LAG-Function doesn't work as expected

2024-07-09 Thread Brandl, Johann
Hi everyone,
i'm new to flink and tried some queries with flink sql.

Currently I have a problem with the LAG function. I want to emit a new record 
when the ORDERS_ID changes. To do this, I use the LAG function to detect 
whether this has changed.
However, I noticed that every now and then I cannot access the ORDERS_ID of the 
previous message. It seems to have to do with the proctime I use in the ORDER 
BY with LAG. As soon as the proctime changes in the range of seconds, I cannot 
access the last value and gives me NULL. Do any of you know what this could be?
Here is the query I use:

CREATE TABLE UNITS_DATA(
  proctime AS PROCTIME()
, `IDENT` DOUBLE
, `STEPS_ID` DOUBLE
, `ORDERS_ID` DOUBLE
) WITH (
  'connector' = 'kafka',
  'topic' = 'UNITS_DATA',
  'properties.bootstrap.servers' = 'http://myserver:9094',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'http://myserver:8080/apis/ccompat/v6/'
);

WITH current_and_previous as (
SELECTproctime
, ud.STEPS_ID as STEPS_ID
, ud.IDENT as UNITS_DATA_ID
, ud.ORDERS_ID as ORDERS_ID
, LAG(ud.ORDERS_ID, 1) OVER (PARTITION BY STEPS_ID 
ORDER BY proctime) PREV_ORDERS_ID
FROMUNITS_DATA ud
WHERE STEPS_ID=64911
)
select *
from current_and_previous;

Thanks in advance and best regards



Zollner Elektronik AG
Manfred-Zollner-Str. 1, 93499 Zandt, GERMANY / Phone: +49 9944 201-0 / Fax: +49 
9944 201-1314 / i...@zollner.de / www.zollner.de
Registered office of the company: Zandt / Registration court Regensburg HRB 8354

Managing Board: Ludwig Zollner (Spokesman of the Board) / Manfred Zollner jun. 
/ Christian Zollner / Thomas Schreiner / Markus Aschenbrenner
Supervisory Board: Manfred Zollner sen. (Chairman)


Wichtiger Hinweis: Diese E-Mail enth?lt vertrauliche Informationen. Wenn Sie 
diese E-Mail irrt?mlich erhalten haben, informieren Sie bitte sofort den 
Absender und vernichten Sie diese E-Mail. Die Anfertigung unerlaubter Kopien, 
die Weitergabe der Information sowie die unbefugte Weitersendung der E-Mail ist 
nicht gestattet. Weiterf?hrend k?nnen Hinweise zur Verarbeitung Ihrer 
personenbezogenen Daten (nach Art. 13 und 14 Datenschutzgrundverordnung - 
DSGVO) unter dem folgenden Link abgerufen werden: 
https://www.zollner.de/datenschutz-verarbeitungsverzeichnis-extern

Important note: This e-mail contains confidential information. If you received 
this e-mail by mistake, please inform the sender immediately and destroy this 
e-mail. The making of unauthorized copies, the passing on of the information 
and the unauthorized forwarding of the e-mail is not permitted. Furhermore 
information about the processing of your personal Information (according to 
art. 13 and 14 general data protection regulation - GDPR) can be retrieved on 
the page: 
https://www.zollner-electronics.com/en/data-protection-processing-directory-external
  


Using BlobServer in FlinkDeployment

2024-07-09 Thread Saransh Jain
Hi all, I am deploying a FlinkDeployment CR in an Operator watched
namespace. I have passed these configs in the flinkConfiguration:

blob.server.address: "jobmanager"
blob.server.port: "6128"
blob.storage.directory: "/tmp/jars/"

There are a couple of jars that I don't want to make part of the image. But
they should be available in /opt/flink/lib/ in all pods. I am downloading
some jars using initContainer in the JobManager pod to /tmp/jars location
and then copying them in /opt/flink/lib/
Expectation is that these jars are shipped to TM pods /tmp/jars/ (from here
we will move these jars to /opt/flink/lib/) . But jars are not coming to
the TM pod. No relevant error logs in TRACE mode also. What am I missing?


Thanks & Regards
Saransh Jain


Encountering scala.matchError in Flink 1.18.1 Query

2024-07-08 Thread Norihiro FUKE
Hi, community

I encountered a scala.matchError when trying to obtain the table plan for
the following query in Flink 1.18.1.

The input data is read from Kafka, and the query is intended to perform a
typical WordCount operation. The query is as follows. SPLIT_STRING is a
Table Function UDF that splits sentences into words by spaces.

```SELECT
window_start,
word,
COUNT(*) AS `count`
FROM
TABLE(
  TUMBLE(TABLE input_table, DESCRIPTOR(proctime), INTERVAL '10' SECOND)),
  LATERAL TABLE(SPLIT_STRING(sentence)) AS T(word)
GROUP BY
window_start,
window_end,
word```

The error message received is:

```
[ERR] scala.MatchError:
rel#177237:FlinkLogicalCorrelate.LOGICAL.any.None:
0.[NONE].[NONE](left=RelSubset#177235,right=RelSubset#177236,correlation=$cor0,joinType=inner,requiredColumns={1})
(of class 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate)
```

I believe that the issue lies in the existNeighbourWindowTableFunc method
in flink-table-planner/WindowUtil.scala, where there is an unconsidered
node (FlinkLogicalCorrelate) when traversing the AST. (This method was
added in FLINK-32578.) I suspect this comes from the LATERAL entry. While
this query was FlinkLogicalCorrelate, I think there might be other
unconsidered nodes as well.

I have two questions regarding this:

   1. Is it an expected behavior for scala.matchError to occur in this
   case? In other words, I suspect this might be an unreported bug.
   2. In the code comments of the PR mentioned in the FLINK-32578 ticket, I
   found the terms "standard form" and "relax form." I searched for "relax
   form" in the Flink documentation but could not find any reference. As a
   workaround for this issue, using the WITH clause could be considered, but I
   am uncertain if this is a universal solution.

Thank you for your assistance.


Flink Session jobs goes to reconciling state

2024-07-08 Thread Fidea Lidea
Hi Team,

I have a few session jobs for running jars.
After creating jobs, the job goes from a running state to a reconciling
state or upgrading state.
How can I resolve this issue?

[image: image.png]


Thanks & Regards
Nida Shaikh


java.lang.OutOfMemory:null

2024-07-07 Thread 冯路路
Hi

Flink任务平稳运行一段时间,资源和数据都很平稳的情况下,一段时间后,忽然在解析json对象时报java.lang.OutOfMemory:null,然后cpu和内存就直线上升,直到完全将资源耗尽,报java.lang.OutOfMemory:java
 heap 
space,增加资源后,过一段时候会有同样的问题出现,这是什么原因,如果是内存泄漏,为什么会CPU和内存都完全平稳的运行一段时间,不应该是全程有直线上升的现象吗




Re: Parallelism of state processor jobs

2024-07-06 Thread Alexis Sarda-Espinosa
Hi Junrui,

I think you understood correctly. What I'm seeing is that each vertex has a
single subtask, but multiple vertices are started in parallel in different
slots. That is not a problem in my case, I _want_ to parallelize the work,
it's just that this mechanism is very different from streaming jobs, where
the total number of slots in the cluster must equal the maximum vertex
parallelism---in other words, a streaming job won't use free slots
regardless of how many vertices there are, whereas a batch job needs one
slot per subtask in order to parallelize. I was not aware that batch jobs
interact with task manager slots like this.

The other thing I mentioned is what really remains as a problem: even
though the different vertices do start in parallel, some of them finish in
seconds and others in more than a minute, even though all of them do the
exact same transformation, just with different operator IDs. I'm using
Flink 1.18.1 btw.

Regards,
Alexis.

Am Sa., 6. Juli 2024 um 12:09 Uhr schrieb Junrui Lee :

> Hi Alexis,
>
> Could you clarify what you mean by "If I add more slots to the task
> manager, I see the transformations actually start in parallel even though I
> submit the job with 'flink run -p 1'"?
> Are you asking if multiple slots are working simultaneously, or if a
> single JobVertex contains multiple subtasks?
>
> In fact, the number of slots and parallelism are not the same concept.
> And Flink Batch jobs can run even with only a single slot, and when more
> slots become available, Flink will schedule and deploy more parallelizable
> tasks (unless their upstream tasks have not finished). If you want only one
> slot to be active at a time, you can limit the resources of the cluster —
> for instance, by setting "slotmanager.number-of-slots.max" to 1.
>
> If you intend for each JobVertex to have a parallelism of 1, and you find
> that this isn't being enforced when using the "flink run -p 1" command. In
> that case, it would be helpful to have more detailed information to assist
> with troubleshooting, including the version of Flink in use and the
> JobManager logs.
>
> Alexis Sarda-Espinosa  于2024年7月6日周六 15:35写道:
>
>> Hi Junrui,
>>
>> Thanks for the confirmation. I tested some more and I'm seeing a strange
>> behavior.
>>
>> I'm currently testing a single source stream that is fed to 6 identical
>> transformations. The state processor api requires batch mode and, from what
>> I can tell, I must specify a parallelism of 1 in the job, otherwise it
>> freezes. However, if I add more slots to the task manager, I see the
>> transformations actually start in parallel even though I submit the job
>> with "flink run -p 1". Is this expected of batch mode?
>>
>> Additionally, regardless of how much memory I give to the task manager,
>> some transformations finish in around 6 seconds, and then the others need
>> more than 1 minute even though it's the same transformation, and each one
>> writes around 70MB in my local disk. The flame graph shows the slow
>> operators are just parked due to an ArrayBlockingQueue whose size is hard
>> coded as 16 in the Flink sources. Am I missing something crucial for tuning
>> such jobs?
>>
>> Regards,
>> Alexis.
>>
>> On Sat, 6 Jul 2024, 03:29 Junrui Lee,  wrote:
>>
>>> Hi Alexis,
>>>
>>> For the SavepointWriter, I've briefly looked over the code and the write
>>> operation is enforced as non-parallel.
>>>
>>> Best,
>>> Junrui
>>>
>>> Alexis Sarda-Espinosa  于2024年7月6日周六 01:27写道:
>>>
 Hi Gabor,

 Thanks for the quick response. What about SavepointWriter? In my case
 I'm actually writing a job that will read from an existing savepoint and
 modify some of its data to write a new one.

 Regards,
 Alexis.

 Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
 gabor.g.somo...@gmail.com>:

> Hi Alexis,
>
> It depends. When one uses SavepointLoader to read metadata only then
> it's non-parallel.
> SavepointReader however is basically a normal batch job with all its
> features.
>
> G
>
>
> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Really quick question, when using the state processor API, are all
>> transformations performed in a non-parallel fashion?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Parallelism of state processor jobs

2024-07-06 Thread Junrui Lee
Hi Alexis,

Could you clarify what you mean by "If I add more slots to the task
manager, I see the transformations actually start in parallel even though I
submit the job with 'flink run -p 1'"?
Are you asking if multiple slots are working simultaneously, or if a single
JobVertex contains multiple subtasks?

In fact, the number of slots and parallelism are not the same concept.  And
Flink Batch jobs can run even with only a single slot, and when more slots
become available, Flink will schedule and deploy more parallelizable tasks
(unless their upstream tasks have not finished). If you want only one slot
to be active at a time, you can limit the resources of the cluster — for
instance, by setting "slotmanager.number-of-slots.max" to 1.

If you intend for each JobVertex to have a parallelism of 1, and you find
that this isn't being enforced when using the "flink run -p 1" command. In
that case, it would be helpful to have more detailed information to assist
with troubleshooting, including the version of Flink in use and the
JobManager logs.

Alexis Sarda-Espinosa  于2024年7月6日周六 15:35写道:

> Hi Junrui,
>
> Thanks for the confirmation. I tested some more and I'm seeing a strange
> behavior.
>
> I'm currently testing a single source stream that is fed to 6 identical
> transformations. The state processor api requires batch mode and, from what
> I can tell, I must specify a parallelism of 1 in the job, otherwise it
> freezes. However, if I add more slots to the task manager, I see the
> transformations actually start in parallel even though I submit the job
> with "flink run -p 1". Is this expected of batch mode?
>
> Additionally, regardless of how much memory I give to the task manager,
> some transformations finish in around 6 seconds, and then the others need
> more than 1 minute even though it's the same transformation, and each one
> writes around 70MB in my local disk. The flame graph shows the slow
> operators are just parked due to an ArrayBlockingQueue whose size is hard
> coded as 16 in the Flink sources. Am I missing something crucial for tuning
> such jobs?
>
> Regards,
> Alexis.
>
> On Sat, 6 Jul 2024, 03:29 Junrui Lee,  wrote:
>
>> Hi Alexis,
>>
>> For the SavepointWriter, I've briefly looked over the code and the write
>> operation is enforced as non-parallel.
>>
>> Best,
>> Junrui
>>
>> Alexis Sarda-Espinosa  于2024年7月6日周六 01:27写道:
>>
>>> Hi Gabor,
>>>
>>> Thanks for the quick response. What about SavepointWriter? In my case
>>> I'm actually writing a job that will read from an existing savepoint and
>>> modify some of its data to write a new one.
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
>>> gabor.g.somo...@gmail.com>:
>>>
 Hi Alexis,

 It depends. When one uses SavepointLoader to read metadata only then
 it's non-parallel.
 SavepointReader however is basically a normal batch job with all its
 features.

 G


 On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
 sarda.espin...@gmail.com> wrote:

> Hello,
>
> Really quick question, when using the state processor API, are all
> transformations performed in a non-parallel fashion?
>
> Regards,
> Alexis.
>
>


flinkcdc????postgrep????????checkpoint????????????????????????

2024-07-06 Thread Eleven

PostgresSourceBuilder

Re: Parallelism of state processor jobs

2024-07-06 Thread Alexis Sarda-Espinosa
Hi Junrui,

Thanks for the confirmation. I tested some more and I'm seeing a strange
behavior.

I'm currently testing a single source stream that is fed to 6 identical
transformations. The state processor api requires batch mode and, from what
I can tell, I must specify a parallelism of 1 in the job, otherwise it
freezes. However, if I add more slots to the task manager, I see the
transformations actually start in parallel even though I submit the job
with "flink run -p 1". Is this expected of batch mode?

Additionally, regardless of how much memory I give to the task manager,
some transformations finish in around 6 seconds, and then the others need
more than 1 minute even though it's the same transformation, and each one
writes around 70MB in my local disk. The flame graph shows the slow
operators are just parked due to an ArrayBlockingQueue whose size is hard
coded as 16 in the Flink sources. Am I missing something crucial for tuning
such jobs?

Regards,
Alexis.

On Sat, 6 Jul 2024, 03:29 Junrui Lee,  wrote:

> Hi Alexis,
>
> For the SavepointWriter, I've briefly looked over the code and the write
> operation is enforced as non-parallel.
>
> Best,
> Junrui
>
> Alexis Sarda-Espinosa  于2024年7月6日周六 01:27写道:
>
>> Hi Gabor,
>>
>> Thanks for the quick response. What about SavepointWriter? In my case I'm
>> actually writing a job that will read from an existing savepoint and modify
>> some of its data to write a new one.
>>
>> Regards,
>> Alexis.
>>
>> Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
>> gabor.g.somo...@gmail.com>:
>>
>>> Hi Alexis,
>>>
>>> It depends. When one uses SavepointLoader to read metadata only then
>>> it's non-parallel.
>>> SavepointReader however is basically a normal batch job with all its
>>> features.
>>>
>>> G
>>>
>>>
>>> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
 Hello,

 Really quick question, when using the state processor API, are all
 transformations performed in a non-parallel fashion?

 Regards,
 Alexis.




Re: Parallelism of state processor jobs

2024-07-05 Thread Junrui Lee
Hi Alexis,

For the SavepointWriter, I've briefly looked over the code and the write
operation is enforced as non-parallel.

Best,
Junrui

Alexis Sarda-Espinosa  于2024年7月6日周六 01:27写道:

> Hi Gabor,
>
> Thanks for the quick response. What about SavepointWriter? In my case I'm
> actually writing a job that will read from an existing savepoint and modify
> some of its data to write a new one.
>
> Regards,
> Alexis.
>
> Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
> gabor.g.somo...@gmail.com>:
>
>> Hi Alexis,
>>
>> It depends. When one uses SavepointLoader to read metadata only then it's
>> non-parallel.
>> SavepointReader however is basically a normal batch job with all its
>> features.
>>
>> G
>>
>>
>> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Really quick question, when using the state processor API, are all
>>> transformations performed in a non-parallel fashion?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Parallelism of state processor jobs

2024-07-05 Thread Alexis Sarda-Espinosa
Hi Gabor,

Thanks for the quick response. What about SavepointWriter? In my case I'm
actually writing a job that will read from an existing savepoint and modify
some of its data to write a new one.

Regards,
Alexis.

Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
gabor.g.somo...@gmail.com>:

> Hi Alexis,
>
> It depends. When one uses SavepointLoader to read metadata only then it's
> non-parallel.
> SavepointReader however is basically a normal batch job with all its
> features.
>
> G
>
>
> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Really quick question, when using the state processor API, are all
>> transformations performed in a non-parallel fashion?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Parallelism of state processor jobs

2024-07-05 Thread Gabor Somogyi
Hi Alexis,

It depends. When one uses SavepointLoader to read metadata only then it's
non-parallel.
SavepointReader however is basically a normal batch job with all its
features.

G


On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> Really quick question, when using the state processor API, are all
> transformations performed in a non-parallel fashion?
>
> Regards,
> Alexis.
>
>


Parallelism of state processor jobs

2024-07-05 Thread Alexis Sarda-Espinosa
Hello,

Really quick question, when using the state processor API, are all
transformations performed in a non-parallel fashion?

Regards,
Alexis.


Flink State and Filesystem sink

2024-07-05 Thread Alexandre KY
Hello,

I am trying to implement a satellite image processing chain. Satellite images 
are stored as rasters which are heavy, (several GBs) in a FileSystem (I am 
currently using HDFS for testing purpose but will move on S3 when I'll deploy 
it on the cloud). So in order to reduce the processing time and save up RAM, 
the rasters are split into tiles to which we apply algorithms that can be 
parallelized. Once they are done, we have to aggregate the processed tiles into 
a processed raster (the output product). To do so, we directly write the 
processed tiles onto the processed raster (its skeloton was created in the 
FileSystem beforehand) which allows us to write them as they are produced and 
not wait for all of them to have been processed and save up RAM by not 
gathering them all on one node before writing down the raster.

Since it's a peculiar format not handled by the FileSystem connector and I 
don't know if the FileSystem Sink has a feature to write several times on the 
same file at specific parts, I decided to try my own implementation with a 
Flatmap that receives the processed tiles and write them in HDFS as they come 
using rasterio (a library for reading and writing rasters that uses gdal in 
fact). A simple solution is to open the connection in the Flatmap everytime and 
write (there shouldn't be concurrent writing since the tiles are keyed by 
raster id which will hand over all the tiles of a same raster to the same 
TaskSlot if I have understood correctly). However, I was wondering if I could 
open the connection, store the DatasetReader in the state and call it in the 
Flatmap to write down the tiles to avoid reopening it every time a new 
processed tile is produced (a raster can be divided into thousand of tiles).

To sum it up, here are my questions:
- Can the FileSystem sink write down rasters and write several times on the 
same file ?
- Can Flink's state store java objects such as a DatasetReader (which is 
returned by rasterio.open(...)) ?

Sincerely,
Ky Alexandre



?????? puzzle on OperatorChain

2024-07-04 Thread Enric Ott
Thanks for your confirmation,Yunfeng.




----
??: 
   "Yunfeng Zhou"   
 


Re: puzzle on OperatorChain

2024-07-04 Thread Yunfeng Zhou
Hi Enric,

Yes that even if there is only one operator, StreamTask will still
create an OperatorChain for it. OperatorChain provides an abstract to
process events like endInputs, checkpoints and OperatorEvents in a
unified way, no matter how may operators are running in the
StreamTask. You may refer to StreamTask.operatorChain for its detailed
functionalities.

Best,
Yunfeng

On Thu, Jul 4, 2024 at 5:14 PM Enric Ott <243816...@qq.com> wrote:
>
> Hello,guys:
>   Does Flink transform all operators(including source operator) to 
> OperatorChain even disableoperatorchaining was set to true and even the 
> OperatorChain contains only one single Operator.
>   Thanks.


Task manager memory go on increasing on idle stage also

2024-07-04 Thread Ganesh Walse
Hi All,

My task manager memory goes on increasing in idle stages also any
reason why so.

As a result of the above my job is failing.

Thanks in advance.

Thanks & regards,
Ganesh Walse


puzzle on OperatorChain

2024-07-04 Thread Enric Ott
Hello,guys:
 Does Flink transform all operators(including source operator) to 
OperatorChain evendisableoperatorchaining was set to true and even the 
OperatorChain contains only one single Operator.
 Thanks.

Re: Postgres-CDC start replication fails after stop/start on flink stream

2024-07-04 Thread Yanquan Lv
Hi, David.
We've met a similar problem of pg connection, the error message is 'Socket
is closed' and we put a lot of effort into investigating, but we couldn't
find the reason.
Then we modify the publication mode[1] and only subscribe the changes of
certain table with following connector options:
'decoding.plugin.name' = 'pgoutput',
"slot.name": "flink_cleaned_v2",
'debezium.publication.name' = 'flink_cleaned_v2_publication',
'debezium.publication.autocreate.mode'='filtered'

and publication and slot should be pre-create by SQL like following:
CREATE PUBLICATION flink_cleaned_v2_publication FOR TABLE your_table;
SELECT pg_create_logical_replication_slot('flink_cleaned_v2', 'pgoutput');

This can reduce the network traffic sent to the client, and the situation
of disconnection is almost not happening. Maybe you can have a try on this,
and I'm looking forward to your feedback.
[1]
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-publication-autocreate-mode


David Bryson  于2024年7月3日周三 08:46写道:

> I have a flink stream using Postgres-CDC as a source.  It's been operating
> mostly fine, but I recently had to stop, and then start the stream again.
> The stream is never able to start again as the replication never completes
> and Flink enters a restart loop.
>
> Upon starting the cdc reader task issues a "START REPLICATION" call on the
> postgres primary, this call then spent 1-1.5 hours transferring data and
> the operator is 100% busy.
> I'm not sure why the connector would not resume from the most recent
> snapshot, as the configuration is for 'latest-offset'.  Here are the
> connector options:
>
>  "slot.name": "flink_cleaned_v2",
>  "heartbeat.interval.ms": "15000",
>  "scan.snapshot.fetch.size": "8192",
>  "debezium.max.queue.size": "2048",
>  "debezium.max.batch.size": "1024",
>  "scan.incremental.snapshot.enabled": "true",
>  "scan.incremental.snapshot.chunk.size": "80960",
>  "debezium.slot.drop.on.stop": "false",
>  "debezium.slot.max.retries": "15",
>  "debezium.slot.retry.delay.ms": "67000",
>
> The logs on the RDS suggest that the CDC client is disconnecting, and the
> logs on Flink seem to suggest the RDS is disconnecting.  I'm very confused
> by this as my wal_sender_timeout is 120s.  Are there other settings I
> should be adjusting? How can I figure out who is disconnecting from who?
> It really feels like a socket/keep alive timeout of some kind is being
> missed.
>
> Flink 1.18
> CDC 3.1.1
>
> The RDS logs:
>
> 2024-07-02 21:02:22 UTC:10.32.1.45(7501):postgres_rw@dendrometer:[24901]:LOG:
>  could not receive data from client: Connection reset by peer
> 2024-07-02 21:02:22 UTC:10.32.0.67(10827):postgres_rw@dendrometer:[26483]:LOG:
>  could not receive data from client: Connection reset by peer
>
> Flink throwable:
>
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:173)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> 

Re: watermark and barrier

2024-07-03 Thread Yunfeng Zhou
Hi Enric,

OperatorCoordinator is a mechanism allowing subtasks of the same
operator to communicate with each other and thus unifying the behavior
of subtasks running on different machines. It has mainly been used in
source operators to distribute source splits. As for watermarks, there
are multiple strategies to generate them, and I cannot tell
immediately whether some of the strategies rely on SourceCoordinator.
You can refer to the following document for these strategies.
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/

You are right that the CheckpointCoordinator is responsible for
triggering, monitoring and managing checkpoint barriers. Also here is
Flink's detailed description about checkpoints.
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/

Best,
Yunfeng

On Wed, Jul 3, 2024 at 10:55 AM Enric Ott <243816...@qq.com> wrote:
>
> Hello,Community:
>   Are watermark and checkpoint barrier just generated by source and 
> coordinated by OperatorCoordinator/CheckpointCoordinator ?
>   Any clues appreciated!
>
>
> -- 原始邮件 --
> 发件人: "Enric Ott" <243816...@qq.com>;
> 发送时间: 2024年1月25日(星期四) 下午2:54
> 收件人: "user";
> 主题: how to get flink accumulated sink record count
>
> Hi,Team:
> I was wondering how to get flink accumulated sink record count(just like the 
> flink UI displays),any help would be appreciated.


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.9.0 released

2024-07-03 Thread Peter Huang
Thanks for the effort, Guyla!


Best Regards
Peter Huang

On Wed, Jul 3, 2024 at 12:48 PM Őrhidi Mátyás 
wrote:

> Thank you, Gyula! 拾
> Cheers
> On Wed, Jul 3, 2024 at 8:00 AM Gyula Fóra  wrote:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink Kubernetes Operator 1.9.0.
> >
> > The Flink Kubernetes Operator allows users to manage their Apache Flink
> > applications and their lifecycle through native k8s tooling like kubectl.
> >
> > Release blogpost:
> >
> https://flink.apache.org/2024/07/02/apache-flink-kubernetes-operator-1.9.0-release-announcement/
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Maven artifacts for Flink Kubernetes Operator can be found at:
> >
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
> >
> > Official Docker image for Flink Kubernetes Operator can be found at:
> > https://hub.docker.com/r/apache/flink-kubernetes-operator
> >
> > The full release notes are available in Jira:
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354417
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Regards,
> > Gyula Fora
> >
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.9.0 released

2024-07-03 Thread Őrhidi Mátyás
Thank you, Gyula! 拾
Cheers
On Wed, Jul 3, 2024 at 8:00 AM Gyula Fóra  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Kubernetes Operator 1.9.0.
>
> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
>
> Release blogpost:
> https://flink.apache.org/2024/07/02/apache-flink-kubernetes-operator-1.9.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Kubernetes Operator can be found at:
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>
> Official Docker image for Flink Kubernetes Operator can be found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354417
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Gyula Fora
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.9.0 released

2024-07-03 Thread Őrhidi Mátyás
Thank you, Gyula! 拾
Cheers
On Wed, Jul 3, 2024 at 8:00 AM Gyula Fóra  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Kubernetes Operator 1.9.0.
>
> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
>
> Release blogpost:
> https://flink.apache.org/2024/07/02/apache-flink-kubernetes-operator-1.9.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Kubernetes Operator can be found at:
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>
> Official Docker image for Flink Kubernetes Operator can be found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354417
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Gyula Fora
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.9.0 released

2024-07-03 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.9.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release blogpost:
https://flink.apache.org/2024/07/02/apache-flink-kubernetes-operator-1.9.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354417

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.9.0 released

2024-07-03 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.9.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release blogpost:
https://flink.apache.org/2024/07/02/apache-flink-kubernetes-operator-1.9.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354417

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Flink LAG-Function doesn't work as expected

2024-07-03 Thread Brandl, Johann
Hi everyone,

i’m new to flink and tried some queries with flink sql.



Currently I have a problem with the LAG function. I want to emit a new record 
when the ORDERS_ID changes. To do this, I use the LAG function to detect 
whether this has changed.

However, I noticed that every now and then I cannot access the ORDERS_ID of the 
previous message. It seems to have to do with the proctime I use in the ORDER 
BY with LAG. As soon as the proctime changes in the range of seconds, I cannot 
access the last value and gives me NULL. Do any of you know what this could be?

Here is the query I use:



CREATE TABLE UNITS_DATA(

  proctime AS PROCTIME()

, `IDENT` DOUBLE

, `STEPS_ID` DOUBLE

, `ORDERS_ID` DOUBLE

) WITH (

  'connector' = 'kafka',

  'topic' = 'UNITS_DATA',

  'properties.bootstrap.servers' = 'http://myserver:9094',

  'scan.startup.mode' = 'latest-offset',

  'format' = 'avro-confluent',

  'avro-confluent.url' = 'http://myserver:8080/apis/ccompat/v6/'

);



WITH current_and_previous as (

SELECTproctime

, ud.STEPS_ID as STEPS_ID

, ud.IDENT as UNITS_DATA_ID

, ud.ORDERS_ID as ORDERS_ID

, LAG(ud.ORDERS_ID, 1) OVER (PARTITION BY STEPS_ID 
ORDER BY proctime) PREV_ORDERS_ID

FROMUNITS_DATA ud

WHERE STEPS_ID=64911

)

select *

from current_and_previous;



Thanks in advance and best regards




Zollner Elektronik AG
Manfred-Zollner-Str. 1, 93499 Zandt, GERMANY / Phone: +49 9944 201-0 / Fax: +49 
9944 201-1314 / i...@zollner.de / www.zollner.de
Registered office of the company: Zandt / Registration court Regensburg HRB 8354

Managing Board: Ludwig Zollner (Spokesman of the Board) / Manfred Zollner jun. 
/ Christian Zollner / Thomas Schreiner / Markus Aschenbrenner
Supervisory Board: Manfred Zollner sen. (Chairman)


Wichtiger Hinweis: Diese E-Mail enthält vertrauliche Informationen. Wenn Sie 
diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den 
Absender und vernichten Sie diese E-Mail. Die Anfertigung unerlaubter Kopien, 
die Weitergabe der Information sowie die unbefugte Weitersendung der E-Mail ist 
nicht gestattet. Weiterführend können Hinweise zur Verarbeitung Ihrer 
personenbezogenen Daten (nach Art. 13 und 14 Datenschutzgrundverordnung - 
DSGVO) unter dem folgenden Link abgerufen werden: 
https://www.zollner.de/datenschutz-verarbeitungsverzeichnis-extern

Important note: This e-mail contains confidential information. If you received 
this e-mail by mistake, please inform the sender immediately and destroy this 
e-mail. The making of unauthorized copies, the passing on of the information 
and the unauthorized forwarding of the e-mail is not permitted. Furhermore 
information about the processing of your personal Information (according to 
art. 13 and 14 general data protection regulation - GDPR) can be retrieved on 
the page: 
https://www.zollner-electronics.com/en/data-protection-processing-directory-external
  


Re: Flink write ADLS show error: No FileSystem for scheme "file"

2024-07-03 Thread Xiao Xu
Hi, Gabor,


I'm curious about why this happened in Azure file and not in other file
format(I tried use s3 and it works OK)

Gabor Somogyi  于2024年7月2日周二 16:59写道:

> I see, thanks for sharing.
>
> The change what you've made makes sense. Let me explain the details.
> Each and every plugin has it's own class loader. The reason behind that is
> to avoid dependency collision with Flink's main class loader.
>
> I think if the mentioned change works when it's added as normal lib and
> not as a plugin then the code can be merged to main as-is.
>
> G
>
>
> On Thu, Jun 27, 2024 at 5:30 AM Xiao Xu  wrote:
>
>> Hi, Gabar,
>>
>> Thanks to reply, I make sure that not conflict in maven, all the hadoop
>> dependency is in provided scope,
>> and checked my result jar it not contains
>> (src/main/resources/META-INF/services).
>>
>> This is my pom:
>>
>> http://maven.apache.org/POM/4.0.0; 
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>> 4.0.0
>>
>> com.test.flink
>> flink-sync
>> 1.0-SNAPSHOT
>> jar
>>
>> Flink Quickstart Job
>>
>> 
>>   1.8
>>   1.8
>>   1.18.1
>>   1.8
>>   2.12
>>   3.2.0
>>3.3.4
>>2.16.0
>>3.2.0
>> 
>>
>> 
>>
>>   org.apache.flink
>>   flink-java
>>   ${flink.version}
>>   provided
>>
>>
>>
>>   org.apache.flink
>>   flink-streaming-java
>>   ${flink.version}
>>   provided
>>
>>
>>
>>   org.apache.flink
>>   flink-clients
>>   ${flink.version}
>>   provided
>>
>>
>>
>>   org.apache.flink
>>   flink-connector-files
>>   ${flink.version}
>>
>>
>>   org.apache.flink
>>   flink-connector-kafka
>>   3.1.0-1.18
>>
>>
>>
>>   org.apache.logging.log4j
>>   log4j-slf4j-impl
>>   ${log4j.version}
>>   runtime
>>   
>>  
>> slf4j-api
>> org.slf4j
>>  
>>   
>>
>>
>>   org.apache.logging.log4j
>>   log4j-api
>>   ${log4j.version}
>>   runtime
>>
>>
>>   org.apache.logging.log4j
>>   log4j-core
>>   ${log4j.version}
>>   runtime
>>
>>
>>
>>   org.apache.flink
>>   flink-azure-fs-hadoop
>>   ${flink.version}
>>   provided
>>
>> 
>> 
>>
>>   
>>  org.apache.maven.plugins
>>  maven-assembly-plugin
>>  3.0.0
>>  
>> false
>> 
>>jar-with-dependencies
>> 
>>  
>>  
>> 
>>make-assembly
>>package
>>
>>   single
>>
>> 
>>  
>>   
>>
>> 
>> 
>>
>>
>> And like my reply in stackoverflow, I found the hadoop-common file :
>> https://github.com/apache/hadoop/blob/release-3.3.4-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3374
>> do not load any filesystem, dig in ServiceLoader.load(FileSystem.class)
>> source code, it looks like have different class loader  make it not load
>> any filesystem.
>> I changed the ServiceLoader.load(FileSystem.class)  to 
>> ServiceLoader.load(FileSystem.class,
>> FileSystem.class.getClassLoader()) and replace the flink-fs-azure-hadoop
>> plugin, it works now,
>> So I'm not sure why it works
>>
>> Gabor Somogyi  于2024年6月26日周三 16:52写道:
>>
>>> Hi Xiao,
>>>
>>> I'm not quite convinced that the azure plugin ruined your workload, I
>>> would take a look at the dependency graph you've in the pom.
>>> Adding multiple deps can conflict in terms of class loader services
>>> (src/main/resources/META-INF/services).
>>>
>>> As an example you've 2 such dependencies where
>>> org.apache.flink.core.fs.FileSystemFactory is in the jar.
>>> Hadoop core contains "flie" and the other one something different. Let's
>>> say you don't use service merge plugin in your
>>> maven project. Such case Hadoop core `file` entry will be just
>>> overwritten by the second one.
>>>
>>> Solution: Either avoid deps with conflicting services or add 
>>> ServicesResourceTransformer
>>> to your maven project.
>>>
>>> G
>>>
>>>
>>> On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu  wrote:
>>>
 Hi, all

 I try to use Flink to write Azure Blob Storage which called ADLS, I put
 the flink-azure-fs-hadoop jar in plugins directory and when I start my
 write job it shows:

 Caused by: 

Re: Reminder: Help required to fix security vulnerabilities in Flink Docker image

2024-07-02 Thread elakiya udhayanan
Hi  Alexis and Gabor ,

Thanks for your valuable suggestions. We tried implementing as per the
suggestion given, updating the GOSE_VERSION to  1.17 and few other changes
from our end and we see that all the jfrog x-ray vulnerabilities are fixed.
Thanks for you support and help.

Thanks,
Elakiya

On Mon, Jun 24, 2024 at 10:02 AM elakiya udhayanan 
wrote:

> Hi  Alexis and Gabor ,
>
> Thanks for your valuable response and suggestions. Will try to work on the
> suggestions and get back to you if require more details.
>
> Thanks,
> Elakiya
>
> On Sun, Jun 23, 2024 at 10:12 PM Gabor Somogyi 
> wrote:
>
>> Hi Elakiya,
>>
>> I've just double checked the story and seems like the latest 1.17 gosu
>> release is not vulnerable.
>> Can you please try it out on your side? Alexis has written down how you
>> can bump the docker version locally:
>>
>> ---CUT-HERE---
>> ENV GOSU_VERSION 1.17
>> ---CUT-HERE---
>>
>> Please report back and we can discuss this further based on that...
>>
>> BR,
>> G
>>
>>
>> On Fri, Jun 21, 2024 at 7:16 PM elakiya udhayanan 
>> wrote:
>>
>>> Hi Team,
>>>
>>> I would like to remind about the request for the help required to fix
>>> the vulnerabilities seen in the Flink Docker image. Any help is appreciated.
>>>
>>> Thanks in advance.
>>>
>>> Thanks,
>>> Elakiya U
>>>
>>> On Tue, Jun 18, 2024 at 12:51 PM elakiya udhayanan 
>>> wrote:
>>>
 Hi Community,

 In one of our applications we are using a Fink Docker image and running
 Flink as a Kubernetes pod. As per policy, we tried scanning the Docker
 image for security vulnerabilities using JFrog XRay and we find that there
 are multiple critical vulnerabilities being reported as seen in the below
 table. This is the same case for the latest Flink version 1.19.0 as well

 | Severity  | Direct Package   | Impacted Package
  | Impacted Package Version | Fixed Versions | Type  | CVE
|

 |---|--|---|---||---||
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.19.8, 1.20.3]   | Go|
 CVE-2023-24538 |
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.19.9, 1.20.4]   | Go|
 CVE-2023-24540 |
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.19.10, 1.20.5]  | Go|
 CVE-2023-29404 |
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.19.10, 1.20.5]  | Go|
 CVE-2023-29405 |
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.19.10, 1.20.5]  | Go|
 CVE-2023-29402 |
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.16.9, 1.17.2]   | Go|
 CVE-2021-38297 |
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.16.14, 1.17.7]  | Go|
 CVE-2022-23806 |
 | Critical  | sha256__0690274ef266a9a2f... | certifi
 | 2020.6.20 | [2023.7.22]| Python|
 CVE-2023-37920 |
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.12.6, 1.13beta1]| Go|
 CVE-2019-11888 |
 | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
  | 1.11.1| [1.11.13, 1.12.8]  | Go|
 CVE-2019-14809 |

 These vulnerabilities are related to the github.com/golang/go and
 certifi packages.

 Please help me addressing the below questions:
 Is there any known workaround for these vulnerabilities while using the
 affected Flink versions?
 Is there an ETA for a fix for these vulnerabilities in upcoming Flink
 releases?
 Are there any specific steps recommended to mitigate these issues in
 the meantime?
 Any guidance or recommendations would be greatly appreciated.

 Thanks in advance

 Thanks,
 Elakiya U

>>>


watermark and barrier

2024-07-02 Thread Enric Ott
Hello,Community:
 Are watermark and checkpoint barrier just generated by source and 
coordinated by OperatorCoordinator/CheckpointCoordinator ?
 Any clues appreciated!




----
??: 
   "Enric Ott"  
  
<243816...@qq.com;
:2024??1??25??(??) 2:54
??:"user"

Postgres-CDC start replication fails after stop/start on flink stream

2024-07-02 Thread David Bryson
I have a flink stream using Postgres-CDC as a source.  It's been operating
mostly fine, but I recently had to stop, and then start the stream again.
The stream is never able to start again as the replication never completes
and Flink enters a restart loop.

Upon starting the cdc reader task issues a "START REPLICATION" call on the
postgres primary, this call then spent 1-1.5 hours transferring data and
the operator is 100% busy.
I'm not sure why the connector would not resume from the most recent
snapshot, as the configuration is for 'latest-offset'.  Here are the
connector options:

 "slot.name": "flink_cleaned_v2",
 "heartbeat.interval.ms": "15000",
 "scan.snapshot.fetch.size": "8192",
 "debezium.max.queue.size": "2048",
 "debezium.max.batch.size": "1024",
 "scan.incremental.snapshot.enabled": "true",
 "scan.incremental.snapshot.chunk.size": "80960",
 "debezium.slot.drop.on.stop": "false",
 "debezium.slot.max.retries": "15",
 "debezium.slot.retry.delay.ms": "67000",

The logs on the RDS suggest that the CDC client is disconnecting, and the
logs on Flink seem to suggest the RDS is disconnecting.  I'm very confused
by this as my wal_sender_timeout is 120s.  Are there other settings I
should be adjusting? How can I figure out who is disconnecting from who?
It really feels like a socket/keep alive timeout of some kind is being
missed.

Flink 1.18
CDC 3.1.1

The RDS logs:

2024-07-02 21:02:22 UTC:10.32.1.45(7501):postgres_rw@dendrometer:[24901]:LOG:
 could not receive data from client: Connection reset by peer
2024-07-02 21:02:22 UTC:10.32.0.67(10827):postgres_rw@dendrometer:[26483]:LOG:
 could not receive data from client: Connection reset by peer

Flink throwable:

java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:173)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.io.IOException:
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException:
An exception occurred in the change event producer. This connector will be
restarted.
at
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
Caused by:
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException:
An exception occurred in the change event producer. This connector will be
restarted.
at
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:214)
at

Re: [External] Re: [External] Re:Re: Backpressure issue with Flink Sql Job

2024-07-02 Thread Ashish Khatkar via user
Hi Penny,

As the join is an unwindowed join, we are performing the backfill since the
data started becoming available.

On Mon, Jul 1, 2024 at 3:14 PM Penny Rastogi  wrote:

> Hi Ashish,
>
> How are you performing the backfill operation? Some time window? Can you
> specify details?
>
> I mean ,if it helps, you can check out
> https://www.ververica.com/blog/how-to-write-fast-flink-sql .
>
>
> Regards
>
> On Tue, Jun 25, 2024 at 4:30 PM Ashish Khatkar via user <
> user@flink.apache.org> wrote:
>
>> Hi Xuyang,
>>
>> The input records are balanced across subtasks, with debloating buffers
>> enabled, the records this subtask receives is less as compared to other
>> subtasks.
>>
>> If the differences among all subtasks are not significant, we might be
>>> encountering an IO bottleneck. In this case, we could try increasing the
>>> parallelism of this vertex, or, as Penny suggested, we could try to enhance
>>> the memory of tm.
>>
>>
>> I checked k8s metrics for the taskmanagers and I don't see any IO issues,
>> I can relook at it again at the time we started backfill. Regarding
>> parallelism, I don't think Flink sql allows separate parallelism for a
>> specific operator. I can try to increase the parallelism for the entire job
>> (which will probably be counter productive for sources as they have 20
>> partitions). Regarding memory, I have already given 72G mem to each
>> taskmanager (slot, 1 tm. = 1 slot). We are currently migrating join use
>> cases from our in house system that we built on top of datastream API and
>> that system never required this much of resources and we never faced any
>> such issues for this job.
>>
>> On Tue, Jun 25, 2024 at 7:30 AM Xuyang  wrote:
>>
>>> Hi, Ashish.
>>>
>>> Can you confirm whether, on the subtask label page of this sink
>>> materializer node, the input records for each subtask are approximately the
>>> same?
>>>
>>> If the input records for subtask number 5 are significantly larger
>>> compared to the others, it signifies a serious data skew, and it would be
>>> necessary to modify the SQL appropriately to resolve this skew.
>>>
>>> If the differences among all subtasks are not significant, we might be
>>> encountering an IO bottleneck. In this case, we could try increasing the
>>> parallelism of this vertex, or, as Penny suggested, we could try to enhance
>>> the memory of tm.
>>>
>>>
>>> --
>>> Best!
>>> Xuyang
>>>
>>>
>>> 在 2024-06-24 21:28:58,"Penny Rastogi"  写道:
>>>
>>> Hi Ashish,
>>> Can you check a few things.
>>> 1. Is your source broker count also 20 for both topics?
>>> 2. You can try increasing the state operation memory and reduce the
>>> disk I/O.
>>>
>>>-
>>>   - Increase the number of CU resources in a single slot.
>>>  - Set optimization parameters:
>>> - taskmanager.memory.managed.fraction=x
>>> - state.backend.rocksdb.block.cache-size=x
>>> - state.backend.rocksdb.writebuffer.size=x
>>>  - 3. If possible, try left window join for your streams
>>>-
>>>- Please, share what sink you are using. Also, the per-operator,
>>>source and sink throughput, if possible?
>>>
>>>
>>> On Mon, Jun 24, 2024 at 3:32 PM Ashish Khatkar via user <
>>> user@flink.apache.org> wrote:
>>>
 Hi all,

 We are facing backpressure in the flink sql job from the sink and the
 backpressure only comes from a single task. This causes the checkpoint to
 fail despite enabling unaligned checkpoints and using debloating buffers.
 We enabled flamegraph and the task spends most of the time doing rocksdb
 get and put. The sql job does a left join over two streams with a
 parallelism of 20. The total data the topics have is 540Gb for one topic
 and roughly 60Gb in the second topic. We are running 20 taskmanagers with 1
 slot each with each taskmanager having 72G mem and 9 cpu.
 Can you provide any help on how to go about fixing the pipeline? We are
 using Flink 1.17.2. The issue is similar to this stackoverflow thread
 ,
 instead of week it starts facing back pressure as soon as the lag comes
 down to 4-5%.

 [image: image.png]

>>>


What the default partition assignment strategy for KafkaSourceBuilder

2024-07-02 Thread Lei Wang
A simple flink task that consumes a kafka topic message and does some
calculation.
The number of partitions of the topic is 48, I set the parallel also 48 and
expect one parallel consumes one partition.
But after submitting the task I found that there's 5 parallels consuming
two partitions and 5 parallels doing nothing.

Why does this happen? What is the default partition assignment strategy ?

Thanks, Lei


??????Fail to pull image autoscaling

2024-07-02 Thread Enric Ott
Hello,Community:
 Is there a reactive scheduling approach in flink which autoscales down 
reacting to input traffic flow reduction for deployment of flink on Kubernetes?
 Thanks.




----
??: 
   "Enric Ott"  
  
<243816...@qq.com;
:2024??6??28??(??) 10:26
??:"user"https://github.com/apache/flink-kubernetes-operator/blob/main/examples/autoscaling/autoscaling.yaml)
 on K8S,but encountered the error fail to pull the image 
autoscaling-example,Could any one give me some clues to work this shit out ?
 Thanks.





--  --
??: 
   "Enric Ott"  
  
<243816...@qq.com;
:2024??6??25??(??) 3:28
??:"Rion Williams"

Re: Flink write ADLS show error: No FileSystem for scheme "file"

2024-07-02 Thread Gabor Somogyi
I see, thanks for sharing.

The change what you've made makes sense. Let me explain the details.
Each and every plugin has it's own class loader. The reason behind that is
to avoid dependency collision with Flink's main class loader.

I think if the mentioned change works when it's added as normal lib and not
as a plugin then the code can be merged to main as-is.

G


On Thu, Jun 27, 2024 at 5:30 AM Xiao Xu  wrote:

> Hi, Gabar,
>
> Thanks to reply, I make sure that not conflict in maven, all the hadoop
> dependency is in provided scope,
> and checked my result jar it not contains
> (src/main/resources/META-INF/services).
>
> This is my pom:
>
> http://maven.apache.org/POM/4.0.0; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
>
> com.test.flink
> flink-sync
> 1.0-SNAPSHOT
> jar
>
> Flink Quickstart Job
>
> 
>   1.8
>   1.8
>   1.18.1
>   1.8
>   2.12
>   3.2.0
>3.3.4
>2.16.0
>3.2.0
> 
>
> 
>
>   org.apache.flink
>   flink-java
>   ${flink.version}
>   provided
>
>
>
>   org.apache.flink
>   flink-streaming-java
>   ${flink.version}
>   provided
>
>
>
>   org.apache.flink
>   flink-clients
>   ${flink.version}
>   provided
>
>
>
>   org.apache.flink
>   flink-connector-files
>   ${flink.version}
>
>
>   org.apache.flink
>   flink-connector-kafka
>   3.1.0-1.18
>
>
>
>   org.apache.logging.log4j
>   log4j-slf4j-impl
>   ${log4j.version}
>   runtime
>   
>  
> slf4j-api
> org.slf4j
>  
>   
>
>
>   org.apache.logging.log4j
>   log4j-api
>   ${log4j.version}
>   runtime
>
>
>   org.apache.logging.log4j
>   log4j-core
>   ${log4j.version}
>   runtime
>
>
>
>   org.apache.flink
>   flink-azure-fs-hadoop
>   ${flink.version}
>   provided
>
> 
> 
>
>   
>  org.apache.maven.plugins
>  maven-assembly-plugin
>  3.0.0
>  
> false
> 
>jar-with-dependencies
> 
>  
>  
> 
>make-assembly
>package
>
>   single
>
> 
>  
>   
>
> 
> 
>
>
> And like my reply in stackoverflow, I found the hadoop-common file :
> https://github.com/apache/hadoop/blob/release-3.3.4-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3374
> do not load any filesystem, dig in ServiceLoader.load(FileSystem.class)
> source code, it looks like have different class loader  make it not load
> any filesystem.
> I changed the ServiceLoader.load(FileSystem.class)  to 
> ServiceLoader.load(FileSystem.class,
> FileSystem.class.getClassLoader()) and replace the flink-fs-azure-hadoop
> plugin, it works now,
> So I'm not sure why it works
>
> Gabor Somogyi  于2024年6月26日周三 16:52写道:
>
>> Hi Xiao,
>>
>> I'm not quite convinced that the azure plugin ruined your workload, I
>> would take a look at the dependency graph you've in the pom.
>> Adding multiple deps can conflict in terms of class loader services
>> (src/main/resources/META-INF/services).
>>
>> As an example you've 2 such dependencies where
>> org.apache.flink.core.fs.FileSystemFactory is in the jar.
>> Hadoop core contains "flie" and the other one something different. Let's
>> say you don't use service merge plugin in your
>> maven project. Such case Hadoop core `file` entry will be just
>> overwritten by the second one.
>>
>> Solution: Either avoid deps with conflicting services or add 
>> ServicesResourceTransformer
>> to your maven project.
>>
>> G
>>
>>
>> On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu  wrote:
>>
>>> Hi, all
>>>
>>> I try to use Flink to write Azure Blob Storage which called ADLS, I put
>>> the flink-azure-fs-hadoop jar in plugins directory and when I start my
>>> write job it shows:
>>>
>>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>>> FileSystem for scheme "file"
>>> at
>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>> at
>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>> at
>>> 

回复: Re:回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-02 Thread Liu Join
谢谢!!!


祝好运,
Liu

发件人: Xuyang 
发送时间: 2024年7月2日 14:00
收件人: user-zh@flink.apache.org 
主题: Re:回复: Re:flink1.18 on yarn提交任务生成多个application

可以参考下这[1]




Tips: 社区新语法EXECUTESTATEMENTSET BEGIN ... END; ,也可以用 begin statement set; ... 
end;




[1]https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/insert/#insert-into-multiple-tables







--

Best!
Xuyang





在 2024-07-02 11:42:32,"Liu Join"  写道:
>你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢!
>
>发件人: Xuyang 
>发送时间: 2024年7月2日 11:25
>收件人: user-zh@flink.apache.org 
>主题: Re:flink1.18 on yarn提交任务生成多个application
>
>Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
>set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。
>
>
>
>
>sql: begin statement set; ...  end;
>
>java & scala table api: tableEnv#createStatementSet
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-07-02 10:04:34,"Liu Join"  写道:
>>版本:flink1.18、hadoop3.0.0
>>提交方式:per-job
>>
>>问题:
>>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris 
>>sink,另一个输出为流转表的print
>>
>>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>>
>>
>>请问这是什么原因


Re:Stateful Function Roadmap

2024-07-02 Thread Xuyang
Hi, Ran.

As far as I know, the development of stateFun has currently come to a 
standstill. You maybe need to fork this repo and pick up what your need.




You can see the latest mail thread here[1].




[1] https://lists.apache.org/thread/mz5956pry8fzowdw399q0hq16zqg5f57




--

Best!
Xuyang




At 2024-07-02 02:42:20, "Ran Jiang via user"  wrote:

Hi team,


We noticed that there wasn't any new version of Stateful Function released 
since last year. Is it still actively being developed? We also noticed that the 
dependencies of it were also old. Is there a roadmap regarding supporting newer 
Flink and Protobuf versions?





Thanks,
Ran

Re:回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-02 Thread Xuyang
可以参考下这[1]




Tips: 社区新语法EXECUTESTATEMENTSET BEGIN ... END; ,也可以用 begin statement set; ... 
end;




[1]https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/insert/#insert-into-multiple-tables







--

Best!
Xuyang





在 2024-07-02 11:42:32,"Liu Join"  写道:
>你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢!
>
>发件人: Xuyang 
>发送时间: 2024年7月2日 11:25
>收件人: user-zh@flink.apache.org 
>主题: Re:flink1.18 on yarn提交任务生成多个application
>
>Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
>set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。
>
>
>
>
>sql: begin statement set; ...  end;
>
>java & scala table api: tableEnv#createStatementSet
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-07-02 10:04:34,"Liu Join"  写道:
>>版本:flink1.18、hadoop3.0.0
>>提交方式:per-job
>>
>>问题:
>>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris 
>>sink,另一个输出为流转表的print
>>
>>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>>
>>
>>请问这是什么原因


Please suggest Flink OpenSearch connector ver 1.0.1-1.16 is compatible with which version of “opensearch-rest-high-level-client”

2024-07-01 Thread Sonal Sharma A via user
Hello Team,

We are using the Flink version 1.16.3 and we are planning to use the Flink 
OpenSearch connector, which requires the dependency of 
"opensearch-rest-high-level-client". Latest version of 
opensearch-rest-high-level-client is 2.15 which is not working with Flink 
OpenSearch connector 1.0.1-1.16. We have tried and its working till 
“opensearch-rest-high-level-client” version 2.9.

Please suggest Flink OpenSearch connector ver 1.0.1-1.16 is compatible with 
which version of  “opensearch-rest-high-level-client” ?

Thanks !!

Regards
Sonal Sharma



回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-01 Thread Liu Join
你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢!

发件人: Xuyang 
发送时间: 2024年7月2日 11:25
收件人: user-zh@flink.apache.org 
主题: Re:flink1.18 on yarn提交任务生成多个application

Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。




sql: begin statement set; ...  end;

java & scala table api: tableEnv#createStatementSet




--

Best!
Xuyang





在 2024-07-02 10:04:34,"Liu Join"  写道:
>版本:flink1.18、hadoop3.0.0
>提交方式:per-job
>
>问题:
>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print
>
>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>
>
>请问这是什么原因


Re:Flink cache support

2024-07-01 Thread Xuyang
Hi, Ganesh.

Can you take a look if the cache strategy in jdbc lookup table can meet your 
requirement?  




[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/jdbc/#lookup-cache







--

Best!
Xuyang




At 2024-07-02 03:35:41, "Ganesh Walse"  wrote:

Hi All,



My prerequisite for datastream is to cache 25-30 table data into the some java 
collection and use them later while processing the stream.
Right now I am just writing select queries and using jdbc api storing those 
results in java collection and filtering data as per need.


Do you have any best way to cache those tables.


Thank you in advance


Thanks,
Ganesh Walse

Re:flink1.18 on yarn提交任务生成多个application

2024-07-01 Thread Xuyang
Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。




sql: begin statement set; ...  end;

java & scala table api: tableEnv#createStatementSet




--

Best!
Xuyang





在 2024-07-02 10:04:34,"Liu Join"  写道:
>版本:flink1.18、hadoop3.0.0
>提交方式:per-job
>
>问题:
>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print
>
>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>
>
>请问这是什么原因


Re: 这绝对算是bug

2024-07-01 Thread Yan Zhou
退订

On Mon, Jul 1, 2024 at 10:22 PM wjw_bigd...@163.com 
wrote:

> 退订
>
>
>
>  回复的原邮件 
> | 发件人 | Cuixb |
> | 日期 | 2024年07月01日 22:16 |
> | 收件人 | user-zh@flink.apache.org |
> | 抄送至 | |
> | 主题 | Re: 这绝对算是bug |
> GC不能说长,也绝对不短,大概计算了一下,24G内存,得有10秒左右无响应,多数在10秒内
> 发自我的 iPhone
>
> > 在 2024年7月1日,17:20,rui chen  写道:
> >
> > 建议检查一下JM的GC情况。
> >
> > wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:
> >
> >> 退订
> >>
> >>
> >>
> >>  回复的原邮件 
> >> | 发件人 | wjw_bigd...@163.com |
> >> | 日期 | 2024年07月01日 17:13 |
> >> | 收件人 | user-zh |
> >> | 抄送至 | |
> >> | 主题 | 回复:这绝对算是bug |
> >> 退订
> >>
> >>
> >>
> >>  回复的原邮件 
> >> | 发件人 | 星海<2278179...@qq.com.INVALID> |
> >> | 日期 | 2024年06月29日 21:31 |
> >> | 收件人 | user-zh |
> >> | 抄送至 | |
> >> | 主题 | 回复: 这绝对算是bug |
> >> 退订
> >>
> >>
> >> --原始邮件--
> >> 发件人:
> >>  "user-zh"
> >><
> >> cfso3...@126.com;退订
> >> 发送时间:2024年6月29日(星期六) 晚上8:24
> >> 收件人:"user-zh" >>
> >> 主题:Re: 这绝对算是bug
> >>
> >>
> >>
> >> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
> >> 发自我的 iPhone
> >>
> >>  在 2024年6月29日,16:49,Zhanghao Chen  写道:
> >> 
> >>  Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
> >> 
> >>  Best,
> >>  Zhanghao Chen
> >>  
> >>  From: Cuixb  >>  Sent: Saturday, June 29, 2024 10:31
> >>  To: user-zh@flink.apache.org  >>  Subject: 这绝对算是bug
> >> 
> >>  生产环境Flink 1.16.2
> >> 
> >>  2024-06-29 09:17:23
> >>  java.lang.Exception: Job leader for job id
> >> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
> >>  at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
> >>  at
> >> java.util.Optional.ifPresent(Optional.java:159)
> >>  at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
> >>  at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
> >>  at
> >>
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> >>  at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
> >>  at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
> >>  at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
> >>  at akka.japi.pf
> >> .UnitCaseStatement.apply(CaseStatements.scala:24)
> >>  at akka.japi.pf
> >> .UnitCaseStatement.apply(CaseStatements.scala:20)
> >>  at
> >> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> >>  at
> >> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> >>  at akka.japi.pf
> >> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> >>  at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >>  at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >>  at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >>  at
> >> akka.actor.Actor.aroundReceive(Actor.scala:537)
> >>  at
> >> akka.actor.Actor.aroundReceive$(Actor.scala:535)
> >>  at
> >> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> >>  at
> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> >>  at
> >> akka.actor.ActorCell.invoke(ActorCell.scala:548)
> >>  at
> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> >>  at
> >> akka.dispatch.Mailbox.run(Mailbox.scala:231)
> >>  at
> >> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> >>  at
> >> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> >>  at
> >> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
> >> 
> >>  发自我的 iPhone
> >


flink1.18 on yarn提交任务生成多个application

2024-07-01 Thread Liu Join
版本:flink1.18、hadoop3.0.0
提交方式:per-job

问题:
1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print

2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b


请问这是什么原因


unsubscribe

2024-07-01 Thread Phil Stavridis


Flink cache support

2024-07-01 Thread Ganesh Walse
Hi All,

My prerequisite for datastream is to cache 25-30 table data into the some
java collection and use them later while processing the stream.
Right now I am just writing select queries and using jdbc api storing those
results in java collection and filtering data as per need.

Do you have any best way to cache those tables.

Thank you in advance

Thanks,
Ganesh Walse


Stateful Function Roadmap

2024-07-01 Thread Ran Jiang via user
Hi team,

We noticed that there wasn't any new version of Stateful Function released
since last year. Is it still actively being developed? We also noticed that
the dependencies of it were also old. Is there a roadmap regarding
supporting newer Flink and Protobuf versions?


Thanks,
Ran


??????????

2024-07-01 Thread wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:17 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:12 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


回复:这绝对算是bug

2024-07-01 Thread wjw_bigd...@163.com
退订



 回复的原邮件 
| 发件人 | Cuixb |
| 日期 | 2024年07月01日 22:16 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Re: 这绝对算是bug |
GC不能说长,也绝对不短,大概计算了一下,24G内存,得有10秒左右无响应,多数在10秒内
发自我的 iPhone

> 在 2024年7月1日,17:20,rui chen  写道:
>
> 建议检查一下JM的GC情况。
>
> wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:
>
>> 退订
>>
>>
>>
>>  回复的原邮件 
>> | 发件人 | wjw_bigd...@163.com |
>> | 日期 | 2024年07月01日 17:13 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复:这绝对算是bug |
>> 退订
>>
>>
>>
>>  回复的原邮件 
>> | 发件人 | 星海<2278179...@qq.com.INVALID> |
>> | 日期 | 2024年06月29日 21:31 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复: 这绝对算是bug |
>> 退订
>>
>>
>> --原始邮件--
>> 发件人:
>>  "user-zh"
>><
>> cfso3...@126.com;退订
>> 发送时间:2024年6月29日(星期六) 晚上8:24
>> 收件人:"user-zh">
>> 主题:Re: 这绝对算是bug
>>
>>
>>
>> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
>> 发自我的 iPhone
>>
>>  在 2024年6月29日,16:49,Zhanghao Chen > 
>>  Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
>> 
>>  Best,
>>  Zhanghao Chen
>>  
>>  From: Cuixb >  Sent: Saturday, June 29, 2024 10:31
>>  To: user-zh@flink.apache.org >  Subject: 这绝对算是bug
>> 
>>  生产环境Flink 1.16.2
>> 
>>  2024-06-29 09:17:23
>>  java.lang.Exception: Job leader for job id
>> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
>>  at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>>  at
>> java.util.Optional.ifPresent(Optional.java:159)
>>  at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>>  at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>>  at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:24)
>>  at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:20)
>>  at
>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>  at
>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>  at akka.japi.pf
>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>  at
>> akka.actor.Actor.aroundReceive(Actor.scala:537)
>>  at
>> akka.actor.Actor.aroundReceive$(Actor.scala:535)
>>  at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>>  at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>>  at
>> akka.actor.ActorCell.invoke(ActorCell.scala:548)
>>  at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>>  at
>> akka.dispatch.Mailbox.run(Mailbox.scala:231)
>>  at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>>  at
>> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>  at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
>> 
>>  发自我的 iPhone
>


Re: JVM heap not getting freed

2024-07-01 Thread Feng Jin
Hi Ganesh

After the job is finished, was there a Full GC during this period? You can
try to capture a heap dump to trigger FullGC and then check the actual
memory usage.

If the memory usage is still high after a Full GC, you should analyze the
heap dump file.

Best,
Feng

On Mon, Jul 1, 2024 at 9:32 PM Ganesh Walse  wrote:

> Hi Team,
>
> After I ran my fink application  on cluster and application runs
> successfully but my JVM heap is still showing 50% is filled.
>
> What will be the reason?
>


Re: 这绝对算是bug

2024-07-01 Thread Cuixb
GC不能说长,也绝对不短,大概计算了一下,24G内存,得有10秒左右无响应,多数在10秒内
发自我的 iPhone

> 在 2024年7月1日,17:20,rui chen  写道:
> 
> 建议检查一下JM的GC情况。
> 
> wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:
> 
>> 退订
>> 
>> 
>> 
>>  回复的原邮件 
>> | 发件人 | wjw_bigd...@163.com |
>> | 日期 | 2024年07月01日 17:13 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复:这绝对算是bug |
>> 退订
>> 
>> 
>> 
>>  回复的原邮件 
>> | 发件人 | 星海<2278179...@qq.com.INVALID> |
>> | 日期 | 2024年06月29日 21:31 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复: 这绝对算是bug |
>> 退订
>> 
>> 
>> --原始邮件--
>> 发件人:
>>  "user-zh"
>><
>> cfso3...@126.com;退订
>> 发送时间:2024年6月29日(星期六) 晚上8:24
>> 收件人:"user-zh"> 
>> 主题:Re: 这绝对算是bug
>> 
>> 
>> 
>> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
>> 发自我的 iPhone
>> 
>>  在 2024年6月29日,16:49,Zhanghao Chen > 
>>  Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
>> 
>>  Best,
>>  Zhanghao Chen
>>  
>>  From: Cuixb >  Sent: Saturday, June 29, 2024 10:31
>>  To: user-zh@flink.apache.org >  Subject: 这绝对算是bug
>> 
>>  生产环境Flink 1.16.2
>> 
>>  2024-06-29 09:17:23
>>  java.lang.Exception: Job leader for job id
>> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
>>  at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>>  at
>> java.util.Optional.ifPresent(Optional.java:159)
>>  at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>>  at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>>  at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:24)
>>  at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:20)
>>  at
>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>  at
>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>  at akka.japi.pf
>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>  at
>> akka.actor.Actor.aroundReceive(Actor.scala:537)
>>  at
>> akka.actor.Actor.aroundReceive$(Actor.scala:535)
>>  at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>>  at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>>  at
>> akka.actor.ActorCell.invoke(ActorCell.scala:548)
>>  at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>>  at
>> akka.dispatch.Mailbox.run(Mailbox.scala:231)
>>  at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>>  at
>> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>  at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
>> 
>>  发自我的 iPhone
> 



JVM heap not getting freed

2024-07-01 Thread Ganesh Walse
Hi Team,

After I ran my fink application  on cluster and application runs
successfully but my JVM heap is still showing 50% is filled.

What will be the reason?


Re: [External] Re:Re: Backpressure issue with Flink Sql Job

2024-07-01 Thread Penny Rastogi
Hi Ashish,

How are you performing the backfill operation? Some time window? Can you
specify details?

I mean ,if it helps, you can check out
https://www.ververica.com/blog/how-to-write-fast-flink-sql .


Regards

On Tue, Jun 25, 2024 at 4:30 PM Ashish Khatkar via user <
user@flink.apache.org> wrote:

> Hi Xuyang,
>
> The input records are balanced across subtasks, with debloating buffers
> enabled, the records this subtask receives is less as compared to other
> subtasks.
>
> If the differences among all subtasks are not significant, we might be
>> encountering an IO bottleneck. In this case, we could try increasing the
>> parallelism of this vertex, or, as Penny suggested, we could try to enhance
>> the memory of tm.
>
>
> I checked k8s metrics for the taskmanagers and I don't see any IO issues,
> I can relook at it again at the time we started backfill. Regarding
> parallelism, I don't think Flink sql allows separate parallelism for a
> specific operator. I can try to increase the parallelism for the entire job
> (which will probably be counter productive for sources as they have 20
> partitions). Regarding memory, I have already given 72G mem to each
> taskmanager (slot, 1 tm. = 1 slot). We are currently migrating join use
> cases from our in house system that we built on top of datastream API and
> that system never required this much of resources and we never faced any
> such issues for this job.
>
> On Tue, Jun 25, 2024 at 7:30 AM Xuyang  wrote:
>
>> Hi, Ashish.
>>
>> Can you confirm whether, on the subtask label page of this sink
>> materializer node, the input records for each subtask are approximately the
>> same?
>>
>> If the input records for subtask number 5 are significantly larger
>> compared to the others, it signifies a serious data skew, and it would be
>> necessary to modify the SQL appropriately to resolve this skew.
>>
>> If the differences among all subtasks are not significant, we might be
>> encountering an IO bottleneck. In this case, we could try increasing the
>> parallelism of this vertex, or, as Penny suggested, we could try to enhance
>> the memory of tm.
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> 在 2024-06-24 21:28:58,"Penny Rastogi"  写道:
>>
>> Hi Ashish,
>> Can you check a few things.
>> 1. Is your source broker count also 20 for both topics?
>> 2. You can try increasing the state operation memory and reduce the disk
>> I/O.
>>
>>-
>>   - Increase the number of CU resources in a single slot.
>>  - Set optimization parameters:
>> - taskmanager.memory.managed.fraction=x
>> - state.backend.rocksdb.block.cache-size=x
>> - state.backend.rocksdb.writebuffer.size=x
>>  - 3. If possible, try left window join for your streams
>>-
>>- Please, share what sink you are using. Also, the per-operator,
>>source and sink throughput, if possible?
>>
>>
>> On Mon, Jun 24, 2024 at 3:32 PM Ashish Khatkar via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi all,
>>>
>>> We are facing backpressure in the flink sql job from the sink and the
>>> backpressure only comes from a single task. This causes the checkpoint to
>>> fail despite enabling unaligned checkpoints and using debloating buffers.
>>> We enabled flamegraph and the task spends most of the time doing rocksdb
>>> get and put. The sql job does a left join over two streams with a
>>> parallelism of 20. The total data the topics have is 540Gb for one topic
>>> and roughly 60Gb in the second topic. We are running 20 taskmanagers with 1
>>> slot each with each taskmanager having 72G mem and 9 cpu.
>>> Can you provide any help on how to go about fixing the pipeline? We are
>>> using Flink 1.17.2. The issue is similar to this stackoverflow thread
>>> ,
>>> instead of week it starts facing back pressure as soon as the lag comes
>>> down to 4-5%.
>>>
>>> [image: image.png]
>>>
>>


Re: 这绝对算是bug

2024-07-01 Thread rui chen
建议检查一下JM的GC情况。

wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:

> 退订
>
>
>
>  回复的原邮件 
> | 发件人 | wjw_bigd...@163.com |
> | 日期 | 2024年07月01日 17:13 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | 回复:这绝对算是bug |
> 退订
>
>
>
>  回复的原邮件 
> | 发件人 | 星海<2278179...@qq.com.INVALID> |
> | 日期 | 2024年06月29日 21:31 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | 回复: 这绝对算是bug |
> 退订
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> cfso3...@126.com;退订
> 发送时间:2024年6月29日(星期六) 晚上8:24
> 收件人:"user-zh"
> 主题:Re: 这绝对算是bug
>
>
>
> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
> 发自我的 iPhone
>
>  在 2024年6月29日,16:49,Zhanghao Chen  
>  Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
> 
>  Best,
>  Zhanghao Chen
>  
>  From: Cuixb   Sent: Saturday, June 29, 2024 10:31
>  To: user-zh@flink.apache.org   Subject: 这绝对算是bug
> 
>  生产环境Flink 1.16.2
> 
>  2024-06-29 09:17:23
>  java.lang.Exception: Job leader for job id
> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
>  at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>  at
> java.util.Optional.ifPresent(Optional.java:159)
>  at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>  at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>  at akka.japi.pf
> .UnitCaseStatement.apply(CaseStatements.scala:24)
>  at akka.japi.pf
> .UnitCaseStatement.apply(CaseStatements.scala:20)
>  at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>  at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>  at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>  at
> akka.actor.Actor.aroundReceive(Actor.scala:537)
>  at
> akka.actor.Actor.aroundReceive$(Actor.scala:535)
>  at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>  at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>  at
> akka.actor.ActorCell.invoke(ActorCell.scala:548)
>  at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>  at
> akka.dispatch.Mailbox.run(Mailbox.scala:231)
>  at
> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>  at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>  at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
> 
>  发自我的 iPhone


????????????????bug

2024-07-01 Thread wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:13 |
| ?? | user-zh |
| ?? | |
|  | bug |




  
| ?? | <2278179...@qq.com.INVALID> |
|  | 2024??06??29?? 21:31 |
| ?? | user-zh |
| ?? | |
|  | ?? ??bug |



----
??: 
   "user-zh"



??????????

2024-07-01 Thread wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:12 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


????????????????bug

2024-07-01 Thread wjw_bigd...@163.com




  
| ?? | <2278179...@qq.com.INVALID> |
|  | 2024??06??29?? 21:31 |
| ?? | user-zh |
| ?? | |
|  | ?? ??bug |



----
??: 
   "user-zh"



??????????

2024-07-01 Thread wjw_bigd...@163.com




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


Re: 这绝对算是bug

2024-06-29 Thread Cuixb
连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
发自我的 iPhone

> 在 2024年6月29日,16:49,Zhanghao Chen  写道:
> 
> Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
> 
> Best,
> Zhanghao Chen
> 
> From: Cuixb 
> Sent: Saturday, June 29, 2024 10:31
> To: user-zh@flink.apache.org 
> Subject: 这绝对算是bug
> 
> 生产环境Flink 1.16.2
> 
> 2024-06-29 09:17:23
> java.lang.Exception: Job leader for job id 8ccdd299194a686e3ecda602c3c75bf3 
> lost leadership.
>at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>at java.util.Optional.ifPresent(Optional.java:159)
>at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>at akka.actor.Actor.aroundReceive(Actor.scala:537)
>at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
> 
> 发自我的 iPhone



Re: 这绝对算是bug

2024-06-29 Thread Zhanghao Chen
Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?

Best,
Zhanghao Chen

From: Cuixb 
Sent: Saturday, June 29, 2024 10:31
To: user-zh@flink.apache.org 
Subject: 这绝对算是bug

生产环境Flink 1.16.2

2024-06-29 09:17:23
java.lang.Exception: Job leader for job id 8ccdd299194a686e3ecda602c3c75bf3 
lost leadership.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
at java.util.Optional.ifPresent(Optional.java:159)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For

发自我的 iPhone


elasticsearch-3.1.0

2024-06-29 Thread Tauseef Janvekar
Dear Team,

Elasticsearch 8 support has been added for flink but not exposed yet.
As per reports, elasticsearch-3.1.0 release has it and we would like to try
it out.

Should we manually build the code and use the jar?
Is there any mvn repo link that can be used?

Any sample code( I have gone through the TestCases of elastic8) would be
greatly appreciated.

Thanks,
Tauseef


这绝对算是bug

2024-06-28 Thread Cuixb
生产环境Flink 1.16.2

2024-06-29 09:17:23
java.lang.Exception: Job leader for job id 8ccdd299194a686e3ecda602c3c75bf3 
lost leadership.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
at java.util.Optional.ifPresent(Optional.java:159)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For

发自我的 iPhone


回复:退订

2024-06-28 Thread wjw_bigdata
退订








 回复的原邮件 
| 发件人 | 张胜军<15910621...@139.com> |
| 发送日期 | 2024年6月28日 12:57 |
| 收件人 | user-zh |
| 主题 | 退订 |



退订
















扫一扫,


快速添加名片到手机







The following is the content of the forwarded email
From:Yanquan Lv  
To:user-zh 
Date:2024-06-26 16:46:05
Subject:Re: Re:cdc读取oracle数据如何解析

可以的,通过设置 debezium 的 decimal.handling.mode

[1] 参数可以实现你的需求,转成
double 或者 string 来处理。

[1]
https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-numeric-types

ha.fen...@aisino.com  于2024年6月26日周三 16:35写道:

Map customConverterConfigs = new HashMap<>()
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric")
PRICE是decimal 类型,按上面设置可以正常了,ID是NUMBER,还是显示"ID":{"scale":0,"value":"AQ=="}
我试了下,我的ID是1,base64后是MQ==,这个ID还不是base64以后的结果。
通过JsonDebeziumDeserializationSchema(true,
customConverterConfigs)打印出来schema
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":false,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable
scaled decimal","field":"ID"},
那是不是有什么方法,把对应的字段类型先设置好传进去

发件人: Yanquan Lv
发送时间: 2024-06-26 14:46
收件人: user-zh
主题: Re: 回复:cdc读取oracle数据如何解析
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>()
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric")
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs)




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build()
返回的结果:


{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同








退订

2024-06-27 Thread 张胜军



退订
















扫一扫,


快速添加名片到手机







The following is the content of the forwarded email
From:Yanquan Lv  
To:user-zh 
Date:2024-06-26 16:46:05
Subject:Re: Re:cdc读取oracle数据如何解析

可以的,通过设置 debezium 的 decimal.handling.mode

[1] 参数可以实现你的需求,转成
double 或者 string 来处理。

[1]
https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-numeric-types

ha.fen...@aisino.com  于2024年6月26日周三 16:35写道:

> Map customConverterConfigs = new HashMap<>()
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric")
> PRICE是decimal 类型,按上面设置可以正常了,ID是NUMBER,还是显示"ID":{"scale":0,"value":"AQ=="}
> 我试了下,我的ID是1,base64后是MQ==,这个ID还不是base64以后的结果。
> 通过JsonDebeziumDeserializationSchema(true,
> customConverterConfigs)打印出来schema
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":false,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable
> scaled decimal","field":"ID"},
> 那是不是有什么方法,把对应的字段类型先设置好传进去
>
> 发件人: Yanquan Lv
> 发送时间: 2024-06-26 14:46
> 收件人: user-zh
> 主题: Re: 回复:cdc读取oracle数据如何解析
> 你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理
>
> 可以通过添加下面代码来让展示信息更直观。
>
> Map customConverterConfigs = new HashMap<>()
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric")
> JsonDebeziumDeserializationSchema schema = new
> JsonDebeziumDeserializationSchema(includeSchema,
> customConverterConfigs)
>
>
>
>
> ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:
>
> > 数据没问题
> > "ID" "NAME"   "ADDTIME""PRICE"
> > 1 "aa" 2024-6-25 14:21:33  12.22
> >
> > 发件人: 15868861416
> > 发送时间: 2024-06-25 17:19
> > 收件人: user-zh@flink.apache.org
> > 主题: 回复:cdc读取oracle数据如何解析
> > 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
> >
> >
> >
> >
> > | |
> > 博星
> > |
> > |
> > 15868861...@163.com
> > |
> >
> >
> >  回复的原邮件 
> > | 发件人 | ha.fen...@aisino.com |
> > | 发送日期 | 2024年06月25日 15:54 |
> > | 收件人 | user-zh |
> > | 主题 | cdc读取oracle数据如何解析 |
> > 根据文档的代码
> > JdbcIncrementalSource oracleChangeEventSource =
> > new OracleSourceBuilder()
> > .hostname("host")
> > .port(1521)
> > .databaseList("ORCLCDB")
> > .schemaList("DEBEZIUM")
> > .tableList("DEBEZIUM.PRODUCTS")
> > .username("username")
> > .password("password")
> > .deserializer(new JsonDebeziumDeserializationSchema())
> > .includeSchemaChanges(true) // output the schema changes as well
> > .startupOptions(StartupOptions.initial())
> > .debeziumProperties(debeziumProperties)
> > .splitSize(2)
> > .build()
> > 返回的结果:
> >
> >
> {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
> >
> > 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
> >
>






Fail to pull image autoscaling

2024-06-27 Thread Enric Ott
Hi,Community:
 I'm using flink-kubernetes-operator of version 1.6 to deploy flink auto 
scaling 
(https://github.com/apache/flink-kubernetes-operator/blob/main/examples/autoscaling/autoscaling.yaml)
 on K8S,but encountered the errorfail to pull the image 
autoscaling-example,Could any one give me some clues to work this shit out ?
 Thanks.





----
??: 
   "Enric Ott"  
  
<243816...@qq.com;
:2024??6??25??(??) 3:28
??:"Rion Williams"

Re: Flink write ADLS show error: No FileSystem for scheme "file"

2024-06-26 Thread Xiao Xu
Hi, Gabar,

Thanks to reply, I make sure that not conflict in maven, all the hadoop
dependency is in provided scope,
and checked my result jar it not contains
(src/main/resources/META-INF/services).

This is my pom:

http://maven.apache.org/POM/4.0.0;
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
4.0.0

com.test.flink
flink-sync
1.0-SNAPSHOT
jar

Flink Quickstart Job


  1.8
  1.8
  1.18.1
  1.8
  2.12
  3.2.0
   3.3.4
   2.16.0
   3.2.0



   
  org.apache.flink
  flink-java
  ${flink.version}
  provided
   
   
   
  org.apache.flink
  flink-streaming-java
  ${flink.version}
  provided
   
   
   
  org.apache.flink
  flink-clients
  ${flink.version}
  provided
   
   
   
  org.apache.flink
  flink-connector-files
  ${flink.version}
   
   
  org.apache.flink
  flink-connector-kafka
  3.1.0-1.18
   
   
   
  org.apache.logging.log4j
  log4j-slf4j-impl
  ${log4j.version}
  runtime
  
 
slf4j-api
org.slf4j
 
  
   
   
  org.apache.logging.log4j
  log4j-api
  ${log4j.version}
  runtime
   
   
  org.apache.logging.log4j
  log4j-core
  ${log4j.version}
  runtime
   

   
  org.apache.flink
  flink-azure-fs-hadoop
  ${flink.version}
  provided
   


   
  
 org.apache.maven.plugins
 maven-assembly-plugin
 3.0.0
 
false

   jar-with-dependencies

 
 

   make-assembly
   package
   
  single
   

 
  
   




And like my reply in stackoverflow, I found the hadoop-common file :
https://github.com/apache/hadoop/blob/release-3.3.4-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3374
do not load any filesystem, dig in ServiceLoader.load(FileSystem.class)
source code, it looks like have different class loader  make it not load
any filesystem.
I changed the ServiceLoader.load(FileSystem.class)  to
ServiceLoader.load(FileSystem.class,
FileSystem.class.getClassLoader()) and replace the flink-fs-azure-hadoop
plugin, it works now,
So I'm not sure why it works

Gabor Somogyi  于2024年6月26日周三 16:52写道:

> Hi Xiao,
>
> I'm not quite convinced that the azure plugin ruined your workload, I
> would take a look at the dependency graph you've in the pom.
> Adding multiple deps can conflict in terms of class loader services
> (src/main/resources/META-INF/services).
>
> As an example you've 2 such dependencies where
> org.apache.flink.core.fs.FileSystemFactory is in the jar.
> Hadoop core contains "flie" and the other one something different. Let's
> say you don't use service merge plugin in your
> maven project. Such case Hadoop core `file` entry will be just overwritten
> by the second one.
>
> Solution: Either avoid deps with conflicting services or add 
> ServicesResourceTransformer
> to your maven project.
>
> G
>
>
> On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu  wrote:
>
>> Hi, all
>>
>> I try to use Flink to write Azure Blob Storage which called ADLS, I put
>> the flink-azure-fs-hadoop jar in plugins directory and when I start my
>> write job it shows:
>>
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "file"
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>

Flink k8s operator starts wrong job config from FlinkSessionJob

2024-06-26 Thread Peter Klauke
Hi all,

we're running a session cluster and I submit around 20 jobs to it at the
same time by creating FlinkSessionJob Kubernetes resources. After
sufficient time there are 20 jobs created and running healthy. However, it
appears that some jobs are started with the wrong config. As a result some
jobs are created multiple times, others are missing completely. Each job
runs the same logic, differing only by one argument that specifies the
country code, what determines the Kafka topic to read from and the sink
name.

The job code looks essentially like this:


parser = argparse.ArgumentParser(description="Process some input
> arguments.")
> parser.add_argument("--country", required=True, help="Country code to
> process")
> parser.add_argument("--pyFiles", required=False, help="Python files")
> args = parser.parse_args()
> country = args.country
> if country is None:
> raise ValueError("Country argument (--country) not provided.")
> t_env.execute_sql(f"""
> CREATE OR REPLACE TABLE source_kafka (
> raw_payload BYTES,
> data AS parse(raw_payload),
> `timestamp` AS parse(raw_payload).`timestamp`,
> WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'events.{country}',
> ...
> )
> """)
> table_result = t_env.sql_query("""
>...
> """)
> topic_name = "courier_states"
> sink_name = f"{topic_name}_{country}"
> env.create_temporary_table(sink_name,
> TableDescriptor.for_connector("kafka").schema(...).option("topic",
> topic_name).option(...))
> table_result.execute_insert(sink_name).wait()


The created FlinkSessionJob resources look mostly as I expect them to:

 Spec
>Deployment Name:  pyflink-streaming-job-courier-states
>Job:
>  Args:
>-py
>/opt/flink/entrypoint.py
>--country
>hu
>--pyFiles
>/opt/flink
>  Entry Class:   org.apache.flink.client.python.PythonDriver
>  Parallelism:   1
>  State: running
>  Upgrade Mode:  savepoint
>  Status:
>Error:  
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.util.FlinkRuntimeException:
> java.util.concurrent.Time │
>  
> outException","throwableList":[{"type":"org.apache.flink.util.FlinkRuntimeException","message":"java.util.concurrent.TimeoutException"},{"type":"java.util.concurrent.Timeou
> │
>  tException"}]}
>  Job Status:
>  Job Id:0504be940001
>  Job Name:
> insert-into_default_catalog.default_database.courier_states_eg


Only the generated "Job Name" is incorrect for some of them because the
corresponding job is started with the wrong configuration, the Job Name
should have the suffix "_hu". As you can see there are some Timeout
Exceptions, some ReconciliationExceptions and occasionally also
"RestHandlerException: File e37b911b-eec9-4e75-a6db-befc132a9c2b_empty.jar
does not exist", but none of them would explain why a job is started with a
wrong configuration.

We're using Flink 1.17.1 and Flink Kubernetes Operator 1.8.0 (1.5.0 same
issue).

For context:
We're having the use-case that we need to run the same Flink job for many
countries. For every country there is a separate Kafka topic to read from.
Every computation is separated for a country, e.g. all group by operations
are grouping by country code among other grouping columns. Having one Kafka
source subscribed to multiple Kafka topics (e.g. topic-pattern parameter)
has issues regarding the watermarks. The topics of some countries (with
less messages) are consumed much faster than other countries (with more
messages). That makes all messages from countries with more messages be
considered as late messages, yielding wrong window aggregation results.
What we'd need is per-key watermarking, what's not supported. Also,
initially I assumed that watermark alignment would be helpful here, but I
didn't get it working here. Hence running a session cluster for the same
Flink code with one job per country sounds like a convenient idea to me.

As far as I can see this looks like a bug of the Flink Kubernetes operator
to me. The only workaround I see would be to submit the jobs one by one,
but that's not really feasible for 20+ different jobs.
Does anyone have a good idea how to fix this?


Re: Flink write ADLS show error: No FileSystem for scheme "file"

2024-06-26 Thread Gabor Somogyi
Hi Xiao,

I'm not quite convinced that the azure plugin ruined your workload, I would
take a look at the dependency graph you've in the pom.
Adding multiple deps can conflict in terms of class loader services
(src/main/resources/META-INF/services).

As an example you've 2 such dependencies where
org.apache.flink.core.fs.FileSystemFactory is in the jar.
Hadoop core contains "flie" and the other one something different. Let's
say you don't use service merge plugin in your
maven project. Such case Hadoop core `file` entry will be just overwritten
by the second one.

Solution: Either avoid deps with conflicting services or add
ServicesResourceTransformer
to your maven project.

G


On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu  wrote:

> Hi, all
>
> I try to use Flink to write Azure Blob Storage which called ADLS, I put
> the flink-azure-fs-hadoop jar in plugins directory and when I start my
> write job it shows:
>
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "file"
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.(AbfsOutputStream.java:173)
> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>
> I search the issue looks like this:
> https://stackoverflow.com/questions/77238642/apache-flink-azure-abfs-file-sink-error-streaming-unsupportedfilesystemexcep
>
> my env:
> Flink: 1.18.1
> JDK: 1.8
>
> Does anyone else have the same problem?
>


Re: Re:cdc读取oracle数据如何解析

2024-06-26 Thread Yanquan Lv
可以的,通过设置 debezium 的 decimal.handling.mode

[1] 参数可以实现你的需求,转成
double 或者 string 来处理。

[1]
https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-numeric-types

ha.fen...@aisino.com  于2024年6月26日周三 16:35写道:

> Map customConverterConfigs = new HashMap<>();
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric");
> PRICE是decimal 类型,按上面设置可以正常了,ID是NUMBER,还是显示"ID":{"scale":0,"value":"AQ=="}
> 我试了下,我的ID是1,base64后是MQ==,这个ID还不是base64以后的结果。
> 通过JsonDebeziumDeserializationSchema(true,
> customConverterConfigs);打印出来schema
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":false,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable
> scaled decimal","field":"ID"},
> 那是不是有什么方法,把对应的字段类型先设置好传进去
>
> 发件人: Yanquan Lv
> 发送时间: 2024-06-26 14:46
> 收件人: user-zh
> 主题: Re: 回复:cdc读取oracle数据如何解析
> 你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理
>
> 可以通过添加下面代码来让展示信息更直观。
>
> Map customConverterConfigs = new HashMap<>();
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric");
> JsonDebeziumDeserializationSchema schema = new
> JsonDebeziumDeserializationSchema(includeSchema,
> customConverterConfigs);
>
>
>
>
> ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:
>
> > 数据没问题
> > "ID" "NAME"   "ADDTIME""PRICE"
> > 1 "aa" 2024-6-25 14:21:33  12.22
> >
> > 发件人: 15868861416
> > 发送时间: 2024-06-25 17:19
> > 收件人: user-zh@flink.apache.org
> > 主题: 回复:cdc读取oracle数据如何解析
> > 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
> >
> >
> >
> >
> > | |
> > 博星
> > |
> > |
> > 15868861...@163.com
> > |
> >
> >
> >  回复的原邮件 
> > | 发件人 | ha.fen...@aisino.com |
> > | 发送日期 | 2024年06月25日 15:54 |
> > | 收件人 | user-zh |
> > | 主题 | cdc读取oracle数据如何解析 |
> > 根据文档的代码
> > JdbcIncrementalSource oracleChangeEventSource =
> > new OracleSourceBuilder()
> > .hostname("host")
> > .port(1521)
> > .databaseList("ORCLCDB")
> > .schemaList("DEBEZIUM")
> > .tableList("DEBEZIUM.PRODUCTS")
> > .username("username")
> > .password("password")
> > .deserializer(new JsonDebeziumDeserializationSchema())
> > .includeSchemaChanges(true) // output the schema changes as well
> > .startupOptions(StartupOptions.initial())
> > .debeziumProperties(debeziumProperties)
> > .splitSize(2)
> > .build();
> > 返回的结果:
> >
> >
> {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
> >
> > 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
> >
>


??????????

2024-06-26 Thread wjw_bigd...@163.com




  
| ?? | <402987...@qq.com.INVALID> |
|  | 2024??06??26?? 16:38 |
| ?? | user-zh |
| ?? | |
|  | ?? |














----
??: 
   "user-zh"

<15171440...@163.com;
:2024??6??26??(??) 4:36
??:"user-zh"

回复:退订

2024-06-26 Thread 费文杰



















在 2024-06-26 15:07:45,"15868861416" <15868861...@163.com> 写道:
>你好,可以把ID和PRICE的类型改为NUMBER试一下,我这边flink-sql试过number类型对应到iceberg的decimal数据是正常的
>
>
>| |
>博星
>|
>|
>15868861...@163.com
>|
>
>
> 回复的原邮件 
>| 发件人 | Yanquan Lv |
>| 发送日期 | 2024年06月26日 14:46 |
>| 收件人 |  |
>| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
>你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理
>
>可以通过添加下面代码来让展示信息更直观。
>
>Map customConverterConfigs = new HashMap<>();
>customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
>"numeric");
>JsonDebeziumDeserializationSchema schema = new
>JsonDebeziumDeserializationSchema(includeSchema,
>customConverterConfigs);
>
>
>
>
>ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:
>
>数据没问题
>"ID" "NAME"   "ADDTIME""PRICE"
>1 "aa" 2024-6-25 14:21:33  12.22
>
>发件人: 15868861416
>发送时间: 2024-06-25 17:19
>收件人: user-zh@flink.apache.org
>主题: 回复:cdc读取oracle数据如何解析
>检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
>
>
>
>
>| |
>博星
>|
>|
>15868861...@163.com
>|
>
>
> 回复的原邮件 
>| 发件人 | ha.fen...@aisino.com |
>| 发送日期 | 2024年06月25日 15:54 |
>| 收件人 | user-zh |
>| 主题 | cdc读取oracle数据如何解析 |
>根据文档的代码
>JdbcIncrementalSource oracleChangeEventSource =
>new OracleSourceBuilder()
>.hostname("host")
>.port(1521)
>.databaseList("ORCLCDB")
>.schemaList("DEBEZIUM")
>.tableList("DEBEZIUM.PRODUCTS")
>.username("username")
>.password("password")
>.deserializer(new JsonDebeziumDeserializationSchema())
>.includeSchemaChanges(true) // output the schema changes as well
>.startupOptions(StartupOptions.initial())
>.debeziumProperties(debeziumProperties)
>.splitSize(2)
>.build();
>返回的结果:
>
>{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
>
>如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
>


回复: cdc读取oracle数据如何解析

2024-06-26 Thread 15868861416
你好,可以把ID和PRICE的类型改为NUMBER试一下,我这边flink-sql试过number类型对应到iceberg的decimal数据是正常的


| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | Yanquan Lv |
| 发送日期 | 2024年06月26日 14:46 |
| 收件人 |  |
| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs);




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:

{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同



Flink write ADLS show error: No FileSystem for scheme "file"

2024-06-26 Thread Xiao Xu
Hi, all

I try to use Flink to write Azure Blob Storage which called ADLS, I put the
flink-azure-fs-hadoop jar in plugins directory and when I start my write
job it shows:

Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "file"
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.(AbfsOutputStream.java:173)
~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]

I search the issue looks like this:
https://stackoverflow.com/questions/77238642/apache-flink-azure-abfs-file-sink-error-streaming-unsupportedfilesystemexcep

my env:
Flink: 1.18.1
JDK: 1.8

Does anyone else have the same problem?


Re: 回复:cdc读取oracle数据如何解析

2024-06-26 Thread Yanquan Lv
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs);




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

> 数据没问题
> "ID" "NAME"   "ADDTIME""PRICE"
> 1 "aa" 2024-6-25 14:21:33  12.22
>
> 发件人: 15868861416
> 发送时间: 2024-06-25 17:19
> 收件人: user-zh@flink.apache.org
> 主题: 回复:cdc读取oracle数据如何解析
> 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
>
>
>
>
> | |
> 博星
> |
> |
> 15868861...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | ha.fen...@aisino.com |
> | 发送日期 | 2024年06月25日 15:54 |
> | 收件人 | user-zh |
> | 主题 | cdc读取oracle数据如何解析 |
> 根据文档的代码
> JdbcIncrementalSource oracleChangeEventSource =
> new OracleSourceBuilder()
> .hostname("host")
> .port(1521)
> .databaseList("ORCLCDB")
> .schemaList("DEBEZIUM")
> .tableList("DEBEZIUM.PRODUCTS")
> .username("username")
> .password("password")
> .deserializer(new JsonDebeziumDeserializationSchema())
> .includeSchemaChanges(true) // output the schema changes as well
> .startupOptions(StartupOptions.initial())
> .debeziumProperties(debeziumProperties)
> .splitSize(2)
> .build();
> 返回的结果:
>
> {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
>
> 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
>


Re: [External] Re:Re: Backpressure issue with Flink Sql Job

2024-06-25 Thread Ashish Khatkar via user
Hi Xuyang,

The input records are balanced across subtasks, with debloating buffers
enabled, the records this subtask receives is less as compared to other
subtasks.

If the differences among all subtasks are not significant, we might be
> encountering an IO bottleneck. In this case, we could try increasing the
> parallelism of this vertex, or, as Penny suggested, we could try to enhance
> the memory of tm.


I checked k8s metrics for the taskmanagers and I don't see any IO issues, I
can relook at it again at the time we started backfill. Regarding
parallelism, I don't think Flink sql allows separate parallelism for a
specific operator. I can try to increase the parallelism for the entire job
(which will probably be counter productive for sources as they have 20
partitions). Regarding memory, I have already given 72G mem to each
taskmanager (slot, 1 tm. = 1 slot). We are currently migrating join use
cases from our in house system that we built on top of datastream API and
that system never required this much of resources and we never faced any
such issues for this job.

On Tue, Jun 25, 2024 at 7:30 AM Xuyang  wrote:

> Hi, Ashish.
>
> Can you confirm whether, on the subtask label page of this sink
> materializer node, the input records for each subtask are approximately the
> same?
>
> If the input records for subtask number 5 are significantly larger
> compared to the others, it signifies a serious data skew, and it would be
> necessary to modify the SQL appropriately to resolve this skew.
>
> If the differences among all subtasks are not significant, we might be
> encountering an IO bottleneck. In this case, we could try increasing the
> parallelism of this vertex, or, as Penny suggested, we could try to enhance
> the memory of tm.
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-06-24 21:28:58,"Penny Rastogi"  写道:
>
> Hi Ashish,
> Can you check a few things.
> 1. Is your source broker count also 20 for both topics?
> 2. You can try increasing the state operation memory and reduce the disk
> I/O.
>
>-
>   - Increase the number of CU resources in a single slot.
>  - Set optimization parameters:
> - taskmanager.memory.managed.fraction=x
> - state.backend.rocksdb.block.cache-size=x
> - state.backend.rocksdb.writebuffer.size=x
>  - 3. If possible, try left window join for your streams
>-
>- Please, share what sink you are using. Also, the per-operator,
>source and sink throughput, if possible?
>
>
> On Mon, Jun 24, 2024 at 3:32 PM Ashish Khatkar via user <
> user@flink.apache.org> wrote:
>
>> Hi all,
>>
>> We are facing backpressure in the flink sql job from the sink and the
>> backpressure only comes from a single task. This causes the checkpoint to
>> fail despite enabling unaligned checkpoints and using debloating buffers.
>> We enabled flamegraph and the task spends most of the time doing rocksdb
>> get and put. The sql job does a left join over two streams with a
>> parallelism of 20. The total data the topics have is 540Gb for one topic
>> and roughly 60Gb in the second topic. We are running 20 taskmanagers with 1
>> slot each with each taskmanager having 72G mem and 9 cpu.
>> Can you provide any help on how to go about fixing the pipeline? We are
>> using Flink 1.17.2. The issue is similar to this stackoverflow thread
>> ,
>> instead of week it starts facing back pressure as soon as the lag comes
>> down to 4-5%.
>>
>> [image: image.png]
>>
>


回复:cdc读取oracle数据如何解析

2024-06-25 Thread wjw_bigd...@163.com
退订真的很麻烦,,,退订了好几次没搞懂



 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 日期 | 2024年06月25日 17:25 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:
{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同


回复:cdc读取oracle数据如何解析

2024-06-25 Thread 15868861416
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:
{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同


?????? flink kubernetes flink autoscale behavior

2024-06-25 Thread Enric Ott
Thanks,Rion.I roughly understand.And I still want to know whether the 
deployment ofAdaptive Schedulerrelies on Kubernetes? Are 
there any cases of deploying FlinkAdaptive Scheduleron bare metal 
machine?
Appreciated again.




----
??: 
   "Rion Williams"  
  


Flink AsyncWriter如何进行固定速率的限速?这一块似乎有bug

2024-06-25 Thread jinzhuguang
Flink 1.16.0

搜索到社区有相关文章,其中的实例如下:
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/#rationale-behind-the-ratelimitingstrategy-interface


public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {



private final Bucket bucket;



public TokenBucketRateLimitingStrategy() {

Refill refill = Refill.intervally(1, Duration.ofSeconds(1));

Bandwidth limit = Bandwidth.classic(10, refill);

this.bucket = Bucket4j.builder()

.addLimit(limit)

.build();

}



// ... (information methods not needed)



@Override

public boolean shouldBlock(RequestInfo requestInfo) {

return bucket.tryConsume(requestInfo.getBatchSize());

}



}



我但是这个shouldblock的返回值似乎是反的,我实际使用后发现会发现异步的线程池的队列会很快被打满,抛出RejectedExecutionException。



Re: flink kubernetes flink autoscale behavior

2024-06-24 Thread Rion Williams
Hi Eric,I believe you might be referring to use of the adaptive scheduler which should support these “in-place” scaling operations via:jobmanager.scheduler: adaptiveYou can see the documentation for Elastic Scaling here for additional details and configuration.On Jun 24, 2024, at 11:56 PM, Enric Ott <243816...@qq.com> wrote:Hello,Community:  I’ve recently started using the Flink Kubernetes Operator,and I'd like to know if CPU and Job Parallelism autoscaling are supported without restarting the whole job,if it’s supported, please tell me how to configure and deploy it.    Thanks.

Re:Re: Backpressure issue with Flink Sql Job

2024-06-24 Thread Xuyang
Hi, Ashish. 

Can you confirm whether, on the subtask label page of this sink materializer 
node, the input records for each subtask are approximately the same? 

If the input records for subtask number 5 are significantly larger compared to 
the others, it signifies a serious data skew, and it would be necessary to 
modify the SQL appropriately to resolve this skew. 

If the differences among all subtasks are not significant, we might be 
encountering an IO bottleneck. In this case, we could try increasing the 
parallelism of this vertex, or, as Penny suggested, we could try to enhance the 
memory of tm.




--

Best!
Xuyang




在 2024-06-24 21:28:58,"Penny Rastogi"  写道:

Hi Ashish,
Can you check a few things.
1. Is your source broker count also 20 for both topics?
2. You can try increasing the state operation memory and reduce the disk I/O.
Increase the number of CU resources in a single slot.
Set optimization parameters:
taskmanager.memory.managed.fraction=x
state.backend.rocksdb.block.cache-size=x
state.backend.rocksdb.writebuffer.size=x
3. If possible, try left window join for your streams


Please, share what sink you are using. Also, the per-operator, source and sink 
throughput, if possible?


On Mon, Jun 24, 2024 at 3:32 PM Ashish Khatkar via user  
wrote:

Hi all,


We are facing backpressure in the flink sql job from the sink and the 
backpressure only comes from a single task. This causes the checkpoint to fail 
despite enabling unaligned checkpoints and using debloating buffers. We enabled 
flamegraph and the task spends most of the time doing rocksdb get and put. The 
sql job does a left join over two streams with a parallelism of 20. The total 
data the topics have is 540Gb for one topic and roughly 60Gb in the second 
topic. We are running 20 taskmanagers with 1 slot each with each taskmanager 
having 72G mem and 9 cpu.

Can you provide any help on how to go about fixing the pipeline? We are using 
Flink 1.17.2. The issue is similar to this stackoverflow thread, instead of 
week it starts facing back pressure as soon as the lag comes down to 4-5%. 



Re: Understanding flink-autoscaler behavior

2024-06-24 Thread Zhanghao Chen
You can try session mode with only one job, but still with adaptive scheduler 
disabled. When stopping a session job, the TMs won't be released immediately 
and can be reused later.

Best,
Zhanghao Chen

From: Chetas Joshi 
Sent: Tuesday, June 25, 2024 1:59
To: Zhanghao Chen 
Cc: Gyula Fóra ; Oscar Perez via user 

Subject: Re: Understanding flink-autoscaler behavior

Hello,

After disabling the adaptive scheduler, I was able to have the operator stop 
the job with a savepoint, and resume the job from that savepoint after the 
upgrade. However I observed that the upgrade life cycle is quite slow as it 
takes down and then brings back up all the task managers. I am wondering if 
there are ways to have the operator stop the job with savepoint and resume from 
it without taking down the task managers. If I switch from application mode to 
session mode (only one job on that session cluster) and use the adaptive 
scheduler, does it solve the problem?

Thanks
Chetas

On Wed, Jun 12, 2024 at 7:13 PM Chetas Joshi 
mailto:chetas.jo...@gmail.com>> wrote:
Got it. Thanks!

On Wed, Jun 12, 2024 at 6:49 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
> Does this mean it won't trigger a checkpoint before scaling up or scaling 
> down?

The in-place rescaling won't do that.

> Do I need the autotuning to be turned on for exactly once processing?

It suffices to just go back to the full-restart upgrade mode provided by the 
operator: disable adaptive scheduler, and set the upgradeMode to savepoint [1]. 
Operator will stop the job with a final savepoint, and resume the job from that 
savepoint after the upgrade.

[1] 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades


Best,
Zhanghao Chen

From: Chetas Joshi mailto:chetas.jo...@gmail.com>>
Sent: Thursday, June 13, 2024 6:33
To: Zhanghao Chen mailto:zhanghao.c...@outlook.com>>
Cc: Sachin Sharma mailto:sachinapr...@gmail.com>>; 
Gyula Fóra mailto:gyula.f...@gmail.com>>; Oscar Perez via 
user mailto:user@flink.apache.org>>

Subject: Re: Understanding flink-autoscaler behavior

Hi Zhanghao,

If I am using the autoscaler (flink-k8s-operator) without enabling auto-tuning, 
the documentation says it triggers in-place scaling without going through a 
full job upgrade lifecycle. Does this mean it won't trigger a checkpoint before 
scaling up or scaling down?

I am observing that entries are being read from Kafka (the source in my flink 
app) multiple times if the flink app goes through scaling. Do I need the 
autotuning to be turned on for exactly once processing?

Thanks
Chetas

On Fri, Jun 7, 2024 at 8:28 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi,

Reactive mode and the Autoscaler in Kubernetes operator are two different 
approaches towards elastic scaling of Flink.

Reactive mode [1] has to be used together with the passive resource management 
approach of Flink (only Standalone mode takes this approach), where the TM 
number is controlled by external systems and the job parallelism is derived 
from the available resources in a reactive manner. The workflow is as follows:

External service (like K8s HPA controller) monitors job load and adjust TM 
number to match the load -> Flink adjust job parallelism reactively.

The Autoscaler [2] is to be used with the active resource management approach 
of Flink (YARN/K8s Application/Session mode), where The TM number is derived 
from the job resource need, and Flink actively requests/releases TMs to match 
job need. The workflow for the Autoscaler is:

Autoscaler monitors job load and adjusts job parallelism actively to match the 
load -> Flink adjusts TM number to match the job need.

I would recommend using the Autoscaler instead of the Reactive mode for:

  1.
The Autoscaler provides a all-in-one solution while Reactive mode needs to be 
paired with an external TM number adjusting system.
  2.
The Autoscaler has a fine-tuned scaling algorithm tailored for streaming 
workloads. It will fine-tune each operator's parallelism and take task load 
balancing in mind while reactive mode just adjust all operator's parallelism 
uniformly.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode
[2] 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/


Best,
Zhanghao Chen

From: Sachin Sharma mailto:sachinapr...@gmail.com>>
Sent: Saturday, June 8, 2024 3:02
To: Gyula Fóra mailto:gyula.f...@gmail.com>>
Cc: Chetas Joshi mailto:chetas.jo...@gmail.com>>; Oscar 
Perez via user mailto:user@flink.apache.org>>
Subject: Re: Understanding flink-autoscaler behavior

Hi,

I have a question related to this.

I am doing a POC with Kubernetes operator 1.8 and flink 1.18 version with 
Reactive mode enabled, I added some 

flink kubernetes flink autoscale behavior

2024-06-24 Thread Enric Ott
Hello,Community:
 I??ve recently started using the Flink Kubernetes Operator,and I'd like 
to know if CPU and Job Parallelism autoscaling are supported without restarting 
the whole job,if it??s supported, please tell me how to configure and deploy 
it.
 Thanks.

Re: Understanding flink-autoscaler behavior

2024-06-24 Thread Chetas Joshi
Hello,

After disabling the adaptive scheduler, I was able to have the operator stop
the job with a savepoint, and resume the job from that savepoint after the
upgrade. However I observed that the upgrade life cycle is quite slow as it
takes down and then brings back up all the task managers. I am wondering if
there are ways to have the operator stop the job with savepoint and resume
from it without taking down the task managers. If I switch from application
mode to session mode (only one job on that session cluster) and use the
adaptive scheduler, does it solve the problem?

Thanks
Chetas

On Wed, Jun 12, 2024 at 7:13 PM Chetas Joshi  wrote:

> Got it. Thanks!
>
> On Wed, Jun 12, 2024 at 6:49 PM Zhanghao Chen 
> wrote:
>
>> > Does this mean it won't trigger a checkpoint before scaling up or
>> scaling down?
>>
>> The in-place rescaling won't do that.
>>
>> > Do I need the autotuning to be turned on for exactly once processing?
>>
>> It suffices to just go back to the full-restart upgrade mode provided by
>> the operator: disable adaptive scheduler, and set the upgradeMode to
>> savepoint [1]. Operator will stop the job with a final savepoint, and
>> resume the job from that savepoint after the upgrade.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Chetas Joshi 
>> *Sent:* Thursday, June 13, 2024 6:33
>> *To:* Zhanghao Chen 
>> *Cc:* Sachin Sharma ; Gyula Fóra <
>> gyula.f...@gmail.com>; Oscar Perez via user 
>>
>> *Subject:* Re: Understanding flink-autoscaler behavior
>>
>> Hi Zhanghao,
>>
>> If I am using the autoscaler (flink-k8s-operator) without enabling
>> auto-tuning, the documentation says it triggers in-place scaling without
>> going through a full job upgrade lifecycle. Does this mean it won't trigger
>> a checkpoint before scaling up or scaling down?
>>
>> I am observing that entries are being read from Kafka (the source in my
>> flink app) multiple times if the flink app goes through scaling. Do I need
>> the autotuning to be turned on for exactly once processing?
>>
>> Thanks
>> Chetas
>>
>> On Fri, Jun 7, 2024 at 8:28 PM Zhanghao Chen 
>> wrote:
>>
>> Hi,
>>
>> Reactive mode and the Autoscaler in Kubernetes operator are two different
>> approaches towards elastic scaling of Flink.
>>
>> Reactive mode [1] has to be used together with the passive resource
>> management approach of Flink (only Standalone mode takes this approach),
>> where the TM number is controlled by external systems and the job
>> parallelism is derived from the available resources in a reactive manner.
>> The workflow is as follows:
>>
>> External service (like K8s HPA controller) monitors job load and adjust
>> TM number to match the load -> Flink adjust job parallelism *reactively*.
>>
>> The Autoscaler [2] is to be used with the active resource management
>> approach of Flink (YARN/K8s Application/Session mode), where The TM number
>> is derived from the job resource need, and Flink actively requests/releases
>> TMs to match job need. The workflow for the Autoscaler is:
>>
>> Autoscaler monitors job load and adjusts job parallelism *actively* to
>> match the load -> Flink adjusts TM number to match the job need.
>>
>> I would recommend using the Autoscaler instead of the Reactive mode for:
>>
>>1. The Autoscaler provides a all-in-one solution while Reactive mode
>>needs to be paired with an external TM number adjusting system.
>>2. The Autoscaler has a fine-tuned scaling algorithm tailored for
>>streaming workloads. It will fine-tune each operator's parallelism and 
>> take
>>task load balancing in mind while reactive mode just adjust all operator's
>>parallelism uniformly.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode
>> [2]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/
>>
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Sachin Sharma 
>> *Sent:* Saturday, June 8, 2024 3:02
>> *To:* Gyula Fóra 
>> *Cc:* Chetas Joshi ; Oscar Perez via user <
>> user@flink.apache.org>
>> *Subject:* Re: Understanding flink-autoscaler behavior
>>
>> Hi,
>>
>> I have a question related to this.
>>
>> I am doing a POC with Kubernetes operator 1.8 and flink 1.18 version with
>> Reactive mode enabled, I added some dummy slow and fast operator to the
>> flink job and i can see there is a back pressure accumulated. but i am not
>> sure why my Flink task managers are not scaled by the operator. Also, can
>> someone explain if autoscalers job is just to add more task manager and
>> then Reactive mode will adjust the parallelism based on the configurations?
>> As per the Operator documentation - Users configure the target
>> utilization percentage of the operators 

Re: Backpressure issue with Flink Sql Job

2024-06-24 Thread Penny Rastogi
Hi Ashish,
Can you check a few things.
1. Is your source broker count also 20 for both topics?
2. You can try increasing the state operation memory and reduce the disk
I/O.

   -
  - Increase the number of CU resources in a single slot.
 - Set optimization parameters:
- taskmanager.memory.managed.fraction=x
- state.backend.rocksdb.block.cache-size=x
- state.backend.rocksdb.writebuffer.size=x
 - 3. If possible, try left window join for your streams
   -
   - Please, share what sink you are using. Also, the per-operator, source
   and sink throughput, if possible?


On Mon, Jun 24, 2024 at 3:32 PM Ashish Khatkar via user <
user@flink.apache.org> wrote:

> Hi all,
>
> We are facing backpressure in the flink sql job from the sink and the
> backpressure only comes from a single task. This causes the checkpoint to
> fail despite enabling unaligned checkpoints and using debloating buffers.
> We enabled flamegraph and the task spends most of the time doing rocksdb
> get and put. The sql job does a left join over two streams with a
> parallelism of 20. The total data the topics have is 540Gb for one topic
> and roughly 60Gb in the second topic. We are running 20 taskmanagers with 1
> slot each with each taskmanager having 72G mem and 9 cpu.
> Can you provide any help on how to go about fixing the pipeline? We are
> using Flink 1.17.2. The issue is similar to this stackoverflow thread
> ,
> instead of week it starts facing back pressure as soon as the lag comes
> down to 4-5%.
>
> [image: image.png]
>


Backpressure issue with Flink Sql Job

2024-06-24 Thread Ashish Khatkar via user
Hi all,

We are facing backpressure in the flink sql job from the sink and the
backpressure only comes from a single task. This causes the checkpoint to
fail despite enabling unaligned checkpoints and using debloating buffers.
We enabled flamegraph and the task spends most of the time doing rocksdb
get and put. The sql job does a left join over two streams with a
parallelism of 20. The total data the topics have is 540Gb for one topic
and roughly 60Gb in the second topic. We are running 20 taskmanagers with 1
slot each with each taskmanager having 72G mem and 9 cpu.
Can you provide any help on how to go about fixing the pipeline? We are
using Flink 1.17.2. The issue is similar to this stackoverflow thread
,
instead of week it starts facing back pressure as soon as the lag comes
down to 4-5%.

[image: image.png]


Re: Reminder: Help required to fix security vulnerabilities in Flink Docker image

2024-06-23 Thread elakiya udhayanan
Hi  Alexis and Gabor ,

Thanks for your valuable response and suggestions. Will try to work on the
suggestions and get back to you if require more details.

Thanks,
Elakiya

On Sun, Jun 23, 2024 at 10:12 PM Gabor Somogyi 
wrote:

> Hi Elakiya,
>
> I've just double checked the story and seems like the latest 1.17 gosu
> release is not vulnerable.
> Can you please try it out on your side? Alexis has written down how you
> can bump the docker version locally:
>
> ---CUT-HERE---
> ENV GOSU_VERSION 1.17
> ---CUT-HERE---
>
> Please report back and we can discuss this further based on that...
>
> BR,
> G
>
>
> On Fri, Jun 21, 2024 at 7:16 PM elakiya udhayanan 
> wrote:
>
>> Hi Team,
>>
>> I would like to remind about the request for the help required to fix the
>> vulnerabilities seen in the Flink Docker image. Any help is appreciated.
>>
>> Thanks in advance.
>>
>> Thanks,
>> Elakiya U
>>
>> On Tue, Jun 18, 2024 at 12:51 PM elakiya udhayanan 
>> wrote:
>>
>>> Hi Community,
>>>
>>> In one of our applications we are using a Fink Docker image and running
>>> Flink as a Kubernetes pod. As per policy, we tried scanning the Docker
>>> image for security vulnerabilities using JFrog XRay and we find that there
>>> are multiple critical vulnerabilities being reported as seen in the below
>>> table. This is the same case for the latest Flink version 1.19.0 as well
>>>
>>> | Severity  | Direct Package   | Impacted Package  |
>>> Impacted Package Version | Fixed Versions | Type  | CVE
>>>|
>>>
>>> |---|--|---|---||---||
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.19.8, 1.20.3]   | Go|
>>> CVE-2023-24538 |
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.19.9, 1.20.4]   | Go|
>>> CVE-2023-24540 |
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.19.10, 1.20.5]  | Go|
>>> CVE-2023-29404 |
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.19.10, 1.20.5]  | Go|
>>> CVE-2023-29405 |
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.19.10, 1.20.5]  | Go|
>>> CVE-2023-29402 |
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.16.9, 1.17.2]   | Go|
>>> CVE-2021-38297 |
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.16.14, 1.17.7]  | Go|
>>> CVE-2022-23806 |
>>> | Critical  | sha256__0690274ef266a9a2f... | certifi   |
>>> 2020.6.20 | [2023.7.22]| Python|
>>> CVE-2023-37920 |
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.12.6, 1.13beta1]| Go|
>>> CVE-2019-11888 |
>>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go
>>>  | 1.11.1| [1.11.13, 1.12.8]  | Go|
>>> CVE-2019-14809 |
>>>
>>> These vulnerabilities are related to the github.com/golang/go and
>>> certifi packages.
>>>
>>> Please help me addressing the below questions:
>>> Is there any known workaround for these vulnerabilities while using the
>>> affected Flink versions?
>>> Is there an ETA for a fix for these vulnerabilities in upcoming Flink
>>> releases?
>>> Are there any specific steps recommended to mitigate these issues in the
>>> meantime?
>>> Any guidance or recommendations would be greatly appreciated.
>>>
>>> Thanks in advance
>>>
>>> Thanks,
>>> Elakiya U
>>>
>>


Re: Reminder: Help required to fix security vulnerabilities in Flink Docker image

2024-06-23 Thread Gabor Somogyi
Hi Elakiya,

I've just double checked the story and seems like the latest 1.17 gosu
release is not vulnerable.
Can you please try it out on your side? Alexis has written down how you can
bump the docker version locally:

---CUT-HERE---
ENV GOSU_VERSION 1.17
---CUT-HERE---

Please report back and we can discuss this further based on that...

BR,
G


On Fri, Jun 21, 2024 at 7:16 PM elakiya udhayanan 
wrote:

> Hi Team,
>
> I would like to remind about the request for the help required to fix the
> vulnerabilities seen in the Flink Docker image. Any help is appreciated.
>
> Thanks in advance.
>
> Thanks,
> Elakiya U
>
> On Tue, Jun 18, 2024 at 12:51 PM elakiya udhayanan 
> wrote:
>
>> Hi Community,
>>
>> In one of our applications we are using a Fink Docker image and running
>> Flink as a Kubernetes pod. As per policy, we tried scanning the Docker
>> image for security vulnerabilities using JFrog XRay and we find that there
>> are multiple critical vulnerabilities being reported as seen in the below
>> table. This is the same case for the latest Flink version 1.19.0 as well
>>
>> | Severity  | Direct Package   | Impacted Package  |
>> Impacted Package Version | Fixed Versions | Type  | CVE
>>|
>>
>> |---|--|---|---||---||
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.8, 1.20.3]   | Go|
>> CVE-2023-24538 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.9, 1.20.4]   | Go|
>> CVE-2023-24540 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29404 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29405 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29402 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.16.9, 1.17.2]   | Go|
>> CVE-2021-38297 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.16.14, 1.17.7]  | Go|
>> CVE-2022-23806 |
>> | Critical  | sha256__0690274ef266a9a2f... | certifi   |
>> 2020.6.20 | [2023.7.22]| Python|
>> CVE-2023-37920 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.12.6, 1.13beta1]| Go|
>> CVE-2019-11888 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.11.13, 1.12.8]  | Go|
>> CVE-2019-14809 |
>>
>> These vulnerabilities are related to the github.com/golang/go and
>> certifi packages.
>>
>> Please help me addressing the below questions:
>> Is there any known workaround for these vulnerabilities while using the
>> affected Flink versions?
>> Is there an ETA for a fix for these vulnerabilities in upcoming Flink
>> releases?
>> Are there any specific steps recommended to mitigate these issues in the
>> meantime?
>> Any guidance or recommendations would be greatly appreciated.
>>
>> Thanks in advance
>>
>> Thanks,
>> Elakiya U
>>
>


Re: Reminder: Help required to fix security vulnerabilities in Flink Docker image

2024-06-21 Thread Alexis Sarda-Espinosa
Hi Elakiya,

just to be clear, I'm not a Flink maintainer, but here my 2 cents.

I imagine the issues related to Go come from 'gosu', which is installed in
the official Flink Docker images. You can see [1] for some thoughts from
the gosu maintainer regarding CVEs (and the md file he links).

Nevertheless, there have been newer gosu releases and Flink hasn't updated
it in a long time, and I think it could be worth doing that, it seems to me
that it's just about changing one env var, e.g. [2].

[1] https://github.com/tianon/gosu/issues/136#issuecomment-2150375314
[2]
https://github.com/apache/flink-docker/blob/master/1.19/scala_2.12-java11-ubuntu/Dockerfile#L28

Regards,
Alexis.

Am Fr., 21. Juni 2024 um 15:37 Uhr schrieb elakiya udhayanan <
laks@gmail.com>:

> Hi Team,
>
> I would like to remind about the request for the help required to fix the
> vulnerabilities seen in the Flink Docker image. Any help is appreciated.
>
> Thanks in advance.
>
> Thanks,
> Elakiya U
>
> On Tue, Jun 18, 2024 at 12:51 PM elakiya udhayanan 
> wrote:
>
>> Hi Community,
>>
>> In one of our applications we are using a Fink Docker image and running
>> Flink as a Kubernetes pod. As per policy, we tried scanning the Docker
>> image for security vulnerabilities using JFrog XRay and we find that there
>> are multiple critical vulnerabilities being reported as seen in the below
>> table. This is the same case for the latest Flink version 1.19.0 as well
>>
>> | Severity  | Direct Package   | Impacted Package  |
>> Impacted Package Version | Fixed Versions | Type  | CVE
>>|
>>
>> |---|--|---|---||---||
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.8, 1.20.3]   | Go|
>> CVE-2023-24538 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.9, 1.20.4]   | Go|
>> CVE-2023-24540 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29404 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29405 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29402 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.16.9, 1.17.2]   | Go|
>> CVE-2021-38297 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.16.14, 1.17.7]  | Go|
>> CVE-2022-23806 |
>> | Critical  | sha256__0690274ef266a9a2f... | certifi   |
>> 2020.6.20 | [2023.7.22]| Python|
>> CVE-2023-37920 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.12.6, 1.13beta1]| Go|
>> CVE-2019-11888 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.11.13, 1.12.8]  | Go|
>> CVE-2019-14809 |
>>
>> These vulnerabilities are related to the github.com/golang/go and
>> certifi packages.
>>
>> Please help me addressing the below questions:
>> Is there any known workaround for these vulnerabilities while using the
>> affected Flink versions?
>> Is there an ETA for a fix for these vulnerabilities in upcoming Flink
>> releases?
>> Are there any specific steps recommended to mitigate these issues in the
>> meantime?
>> Any guidance or recommendations would be greatly appreciated.
>>
>> Thanks in advance
>>
>> Thanks,
>> Elakiya U
>>
>


Reminder: Help required to fix security vulnerabilities in Flink Docker image

2024-06-21 Thread elakiya udhayanan
Hi Team,

I would like to remind about the request for the help required to fix the
vulnerabilities seen in the Flink Docker image. Any help is appreciated.

Thanks in advance.

Thanks,
Elakiya U

On Tue, Jun 18, 2024 at 12:51 PM elakiya udhayanan 
wrote:

> Hi Community,
>
> In one of our applications we are using a Fink Docker image and running
> Flink as a Kubernetes pod. As per policy, we tried scanning the Docker
> image for security vulnerabilities using JFrog XRay and we find that there
> are multiple critical vulnerabilities being reported as seen in the below
> table. This is the same case for the latest Flink version 1.19.0 as well
>
> | Severity  | Direct Package   | Impacted Package  |
> Impacted Package Version | Fixed Versions | Type  | CVE
>|
>
> |---|--|---|---||---||
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.19.8, 1.20.3]   | Go|
> CVE-2023-24538 |
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.19.9, 1.20.4]   | Go|
> CVE-2023-24540 |
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.19.10, 1.20.5]  | Go|
> CVE-2023-29404 |
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.19.10, 1.20.5]  | Go|
> CVE-2023-29405 |
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.19.10, 1.20.5]  | Go|
> CVE-2023-29402 |
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.16.9, 1.17.2]   | Go|
> CVE-2021-38297 |
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.16.14, 1.17.7]  | Go|
> CVE-2022-23806 |
> | Critical  | sha256__0690274ef266a9a2f... | certifi   |
> 2020.6.20 | [2023.7.22]| Python|
> CVE-2023-37920 |
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.12.6, 1.13beta1]| Go|
> CVE-2019-11888 |
> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
> 1.11.1| [1.11.13, 1.12.8]  | Go|
> CVE-2019-14809 |
>
> These vulnerabilities are related to the github.com/golang/go and certifi
> packages.
>
> Please help me addressing the below questions:
> Is there any known workaround for these vulnerabilities while using the
> affected Flink versions?
> Is there an ETA for a fix for these vulnerabilities in upcoming Flink
> releases?
> Are there any specific steps recommended to mitigate these issues in the
> meantime?
> Any guidance or recommendations would be greatly appreciated.
>
> Thanks in advance
>
> Thanks,
> Elakiya U
>


Re: A way to meter number of deserialization errors

2024-06-21 Thread Ilya Karpov
I guess metering deserialization errors may be done by flink metrics,
but now it is missing out of the box.

> - I wondered if you might also want to count how many were successfully 
> parsed in a non-protobuf layer (Dynamic table sort of level)
No, this is not a requirement. I have a simple flink-sql job that
streams proto-messages from kafka to hive, with quite simple mapping.

> - do you know why some messages do not parse? It would interesting to know 
> the types of non-parsable messages, I assume with the idea to understand and 
> potentially fix the source / parser limitations.
Because of a human mistake: developers can introduce non backward
compatible changes in the producer, emitted proto-messages will not be
deserialized on a flink-app side. Failing flink-job actually is an
option worth considering (because now it's the only one source of
proto-messages in the topic and it's ok to halt-and-repair).


ср, 19 июн. 2024 г. в 11:39, David Radley :
>
> Hi Ilya,
>
> I have not got any experience of doing this, but wonder if we could use the 
> Flink Metrics . I wonder:
>
> - There could be hook point at that part of the code to discover some custom 
> code that implements the metrics.
>
> - A better way might be to throw a new unable-to-parse Exception that could 
> be caught in non-protobuf code and then a count metric be incremented. I 
> guess the metrics would be scoped to a Job – like the watermark metrics.
>
> - I wondered if you might also want to count how many were successfully 
> parsed in a non-protobuf layer (Dynamic table sort of level)
>
> - do you know why some messages do not parse? It would interesting to know 
> the types of non-parsable messages, I assume with the idea to understand and 
> potentially fix the source / parser limitations.
>
>
>
> Kind regards, David.
>
>
>
> From: Ilya Karpov 
> Date: Monday, 17 June 2024 at 12:39
> To: user 
> Subject: [EXTERNAL] A way to meter number of deserialization errors
>
> Hi all, we are planning to use flink as a connector between kafka and 
> external systems. We use protobuf as a message format in kafka. If 
> non-backward compatible changes occur we want to skip those messages 
> ('protobuf. ignore-parse-errors'
>
> Hi all,
>
> we are planning to use flink as a connector between kafka and external 
> systems. We use protobuf as a message format in kafka. If non-backward 
> compatible changes occur we want to skip those messages 
> ('protobuf.ignore-parse-errors' = 'true') but record an error and raise an 
> alert. I didn't find any way to record deserialization errors 
> (https://github.com/apache/flink/blob/3a15d1ce69ac21d619f60033ec45cae303489c8f/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java#L73),
>  does it exist?
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Re: Elasticsearch 8 - FLINK-26088

2024-06-20 Thread Tauseef Janvekar
Dear Team,

I see that the connector version referred to is elasticsearch-3.1.0. But I
am not sure from where can I get sample code using this artifact and how to
download this artifact.

Any help is appreciated.

Thanks,
Tauseef

On Tue, 18 Jun 2024 at 18:55, Tauseef Janvekar 
wrote:

> Dear Team,
> As per https://issues.apache.org/jira/browse/FLINK-26088, elasticsearch 8
> support is already added but I do not see it in any documentation.
> Also the last version that supports any elasticsearch is 1.17.x.
>
> Can I get the steps on how to integrate with elastic 8 and some sample
> code would be appreciated.
>
> Thank you,
> Tauseef
>
>


回复: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-20 Thread 15868861416
参考这个案例试试:
CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  `proc_time` AS PROCTIME()
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_dim (
  rowkey INT,
  family1 ROW,
  family2 ROW,
  family3 ROW
) WITH (
  'connector'='cloudhbase',
  'table-name'='',
  'zookeeper.quorum'=''
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  f1c1 INT,
  f3c3 STRING
) WITH (
  'connector'='blackhole'
);

INSERTINTO blackhole_sink
 SELECT a, family1.col1 as f1c1,  family3.col3 as f3c3 FROM datagen_source
JOIN hbase_dim FORSYSTEM_TIMEASOF datagen_source.`proc_time` as h ON 
datagen_source.a = h.rowkey;


| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | xiaohui zhang |
| 发送日期 | 2024年06月20日 10:03 |
| 收件人 |  |
| 主题 | Re: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列 |
flink在写入时需要所有DDL中定义的字段都必须被同时写入,不支持sql中只使用部分字段。
如果你确定只需写入部分数据,在DDL中只定义你用到的部分


zboyu0104  于2024年6月14日周五 15:43写道:

怎么退订
from 阿里邮箱
iPhone--
发件人:谢县东
日 期:2024年06月06日 16:07:05
收件人:
主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

各位好:


flink版本: 1.13.6
我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:


CREATE TABLE hbase_test_db_test_table_xxd (
rowkey STRING,
cf1 ROW,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'test_db:test_table_t1',
'zookeeper.quorum' = 'xxx:2181',
'zookeeper.znode.parent' = '/hbase',
'null-string-literal' = '',
'sink.parallelism' = '2'
);


hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey,
row('xxd_2', 'boy', '10') as cf1;




如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?











Question about concurrentRequests in ElasticsearchWriter

2024-06-20 Thread 장석현
Hi,

I'm currently working with the ElasticsearchSink class in the Flink 
Elasticsearch connector. I noticed that in the createBulkProcessor method, 
setConcurrentRequests(0) is used, which makes the flush() operation blocking.

From my understanding, it seems that even if we set setConcurrentRequests to a 
non-zero value, the use of pendingActions should prevent any data loss. Could 
someone please explain the rationale behind making setConcurrentRequests 
blocking by setting it to 0? Why is this necessary, and what are the benefits 
compared to using a non-zero value?

Thank you in advance for your help!

Re: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-19 Thread xiaohui zhang
flink在写入时需要所有DDL中定义的字段都必须被同时写入,不支持sql中只使用部分字段。
如果你确定只需写入部分数据,在DDL中只定义你用到的部分


zboyu0104  于2024年6月14日周五 15:43写道:

> 怎么退订
> from 阿里邮箱
> iPhone--
> 发件人:谢县东
> 日 期:2024年06月06日 16:07:05
> 收件人:
> 主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列
>
> 各位好:
>
>
> flink版本: 1.13.6
> 我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:
>
>
> CREATE TABLE hbase_test_db_test_table_xxd (
> rowkey STRING,
> cf1 ROW,
> PRIMARY KEY (rowkey) NOT ENFORCED
> ) WITH (
> 'connector' = 'hbase-2.2',
> 'table-name' = 'test_db:test_table_t1',
> 'zookeeper.quorum' = 'xxx:2181',
> 'zookeeper.znode.parent' = '/hbase',
> 'null-string-literal' = '',
> 'sink.parallelism' = '2'
> );
>
>
> hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
> INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey,
> row('xxd_2', 'boy', '10') as cf1;
>
>
>
>
> 如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?
>
>
>
>
>
>
>
>
>


Re: Flink如何做到动态关联join多张维度表中的n张表?

2024-06-19 Thread xiaohui zhang
lookup join可以关联多张维表,但是维表的更新不会触发历史数据刷新。
多维表关联的时候,需要考虑多次关联导致的延迟,以及查询tps对维表数据库的压力。

斗鱼 <1227581...@qq.com.invalid> 于2024年6月19日周三 23:12写道:

> 好的,感谢大佬的回复,之前有了解到Flink的Lookup join好像可以实现类似逻辑,只是不知道Lookup join会不会支持多张动态维度表呢?
>
>
> 斗鱼
> 1227581...@qq.com
>
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> xhzhang...@gmail.com;
> 发送时间:2024年6月19日(星期三) 下午5:55
> 收件人:"user-zh"
> 主题:Re: Flink如何做到动态关联join多张维度表中的n张表?
>
>
>
>
> 维表更新后要刷新历史的事实表吗?这个用flink来做的话,几乎不太可能实现,尤其是涉及到多个维表,相当于每次维表又更新了,就要从整个历史数据里面找到关联的数据,重新写入。不管是状态存储,还是更新数据量,需要的资源都太高,无法处理。
> 在我们目前的实时宽表应用里面,实时表部分一般都是流水类的,取到的维表信息,就应该是业务事实发生时的数据。
> 维表更新后刷新事实的,一般都是夜间批量再更新。如果有强实时更新需求的,只能在查询时再关联维表取最新值
>
> 王旭 
>  互相交流哈,我们也在做类似的改造
>  1.不确定需要关联几张维表的话,是否可以直接都关联了,然后再根据驱动数据中的字段判断要取哪几张维度表的数据,类似left join
> 
> 
> 2.维表变化后对应的结果表也要刷新这个场景,你有提到维表数据是亿级别,可想而知事实表数据更大,如果要反向关联全量事实表的数据,感觉不太适合用流处理;如果只是刷新部分的话,倒是可以将n天内的数据暂存至外部存储介质中
> 
> 
> 
>   回复的原邮件 
>  | 发件人 | 斗鱼<1227581...@qq.com.INVALID |
>  | 日期 | 2024年06月16日 21:08 |
>  | 收件人 | user-zh  | 抄送至 | |
>  | 主题 | 回复:Flink如何做到动态关联join多张维度表中的n张表? |
> 
> 
> 大佬,目前我们还处在调研阶段,SQL或datastream都可以,目前我们DWD或维度表设计是存在ClickHouse/Doris,目前在设计未来的架构,还没实现,只是想向各位大佬取经,麻烦大佬帮忙指教下
> 
> 
>  斗鱼
>  1227581...@qq.com
> 
> 
> 
>  nbsp;
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  xwwan...@163.comgt;;
>  发送时间:nbsp;2024年6月16日(星期天) 晚上9:03
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;回复:Flink如何做到动态关联join多张维度表中的n张表?
> 
> 
> 
>  你好,请问你们是用flink sql api还是datastream api实现这个场景的
> 
> 
> 
>   回复的原邮件 
>  | 发件人 | 斗鱼<1227581...@qq.com.INVALIDgt; |
>  | 日期 | 2024年06月16日 20:35 |
>  | 收件人 | user-zh  | 抄送至 | |
>  | 主题 | Flink如何做到动态关联join多张维度表中的n张表? |
>  请教下各位大佬,目前我们遇到一个场景:
>  1、需要往DWD事实表里写入数据的同时,往Kafka里面写该DWD表的记录信息,该信息
>  2、该Kafka信息会包含一个维度表数据类型的字符串数组
> 
> 
> 3、Flink在做实时消费Kafka中数据,根据类型数组,关联不同的维度表,如数组包含【1,2】,则Flink读取Kafka消息后,将DWD的数据关联维度表1和维度表2后,写入DWS表
> 
> 
> 
> 
> 想请问大佬如何实现根据该数组信息动态关联维度表,这些维度表数据量都挺大的,亿级别的数据,需要能满足维度表变化后,关联后的DWS数据也能变化,不知道是否有什么技术方案能实现,有的话麻烦大佬帮忙给个简单示例或者参考链接,感谢!
> 
> 
> 
> 
> 
> 
> 
>  |
>  |
>  斗鱼
>  1227581...@qq.com
>  |
>  nbsp;


Re:Re: Checkpoints and windows size

2024-06-19 Thread Feifan Wang
Hi banu:


> Not all old sst files are present. Few are removed (i think it is because of 
> compaction).


You are right, rocksdb implement delete a key by insert a entry with null 
value, the space will be release after compaction.


> Now how can I maintain check points size under control??.


Since rocksdb incremental checkpoint directly uploads the sst file (native 
format) , this generally leads to space amplification. Another factor that 
affects the checkpoint size when using rocksdb incremental checkpoint is the 
data compress of rocksdb. In general, it is difficult to accurately estimate 
the checkpoint size. 
Savepoint with canonical format can truly reflect the size of the state data, 
however, making such a savepoint is more expensive because all state data must 
be reorganized into a canonical format. 
I think rocksdb incremental checkpoint is a better choose, although the 
checkpoint size cannot be calculated accurately, it is generally positively 
correlated with the actual state size. When the job runs stably, the checkpoint 
size will fluctuate within a certain range (due to compaction), which can be 
easily observed during the launch preparation phase. Maybe someone with more 
experience can give more valuable advice.


> How can I find idle check point size of my project, I found below link but it 
> is not talking about parallelism.


What do you mean "idle checkpoint size" ?



——

Best regards,

Feifan Wang




在 2024-06-19 17:35:13,"banu priya"  写道:

Hi Wang,


Thanks a lot for your reply.


Currently I have 2s window and check point interval as 10s. Minimum pass 
between check point is 5s. What happens is my check points size is growing 
gradually. I checked the content inside my rocks db local dir and also the 
shared checkpoints directory. Inside chk-x, I have _metaspace file which shows 
list of .sst files referred by that check point.


In that I can see that my very old .sst file is still present. I was expecting 
it to be cleaned.


Not all old sst files are present. Few are removed (i think it is because of 
compaction).


Now how can I maintain check points size under control??.


How can I find idle check point size of my project, I found below link but it 
is not talking about parallelism.


https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines


Any help would really be appreciated :).


Thanks
Banu


On Wed, 19 Jun, 2024, 9:38 am banu priya,  wrote:

Hi All, 


I have a flink job with key by, tumbling window(2sec window time  
processing time)and aggregator.


How often should I run the check point??I don't need the data to be retained 
after 2s.  


I want to use incremental check point with rocksdb. 




Thanks
Banupriya 

Re:Checkpoints and windows size

2024-06-19 Thread Feifan Wang


Hi banu,


First of all, it should be noted that the checkpoint interval does not affect 
the state data live time of the window operator. The life cycle of state data 
is the same as the life cycle of the tumbling window itself. 
A checkpoint is a consistent snapshot of the job ( include state data and other 
infomation), what it really affects is failover. The longer the checkpoint 
interval is, the longer the checkpoint used during failover may be from the 
current time, and the more data needs to be reprocessed. Too short a checkpoint 
interval may result in excessive checkpoint overhead, so this is a trade-off.


My personal suggestion is to set the checkpoint interval to 5 minutes when 
using rocksdb incremental checkpoint. You can also make your own choice based 
on the impacts mentioned above.



——

Best regards,

Feifan Wang




At 2024-06-19 12:08:57, "banu priya"  wrote:

Hi All, 


I have a flink job with key by, tumbling window(2sec window time  
processing time)and aggregator.


How often should I run the check point??I don't need the data to be retained 
after 2s.  


I want to use incremental check point with rocksdb. 




Thanks
Banupriya 

Re: Flink如何做到动态关联join多张维度表中的n张表?

2024-06-19 Thread xiaohui zhang
维表更新后要刷新历史的事实表吗?这个用flink来做的话,几乎不太可能实现,尤其是涉及到多个维表,相当于每次维表又更新了,就要从整个历史数据里面找到关联的数据,重新写入。不管是状态存储,还是更新数据量,需要的资源都太高,无法处理。
在我们目前的实时宽表应用里面,实时表部分一般都是流水类的,取到的维表信息,就应该是业务事实发生时的数据。
维表更新后刷新事实的,一般都是夜间批量再更新。如果有强实时更新需求的,只能在查询时再关联维表取最新值

王旭  于2024年6月16日周日 21:20写道:

> 互相交流哈,我们也在做类似的改造
> 1.不确定需要关联几张维表的话,是否可以直接都关联了,然后再根据驱动数据中的字段判断要取哪几张维度表的数据,类似left join
>
> 2.维表变化后对应的结果表也要刷新这个场景,你有提到维表数据是亿级别,可想而知事实表数据更大,如果要反向关联全量事实表的数据,感觉不太适合用流处理;如果只是刷新部分的话,倒是可以将n天内的数据暂存至外部存储介质中
>
>
>
>  回复的原邮件 
> | 发件人 | 斗鱼<1227581...@qq.com.INVALID> |
> | 日期 | 2024年06月16日 21:08 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | 回复:Flink如何做到动态关联join多张维度表中的n张表? |
>
> 大佬,目前我们还处在调研阶段,SQL或datastream都可以,目前我们DWD或维度表设计是存在ClickHouse/Doris,目前在设计未来的架构,还没实现,只是想向各位大佬取经,麻烦大佬帮忙指教下
>
>
> 斗鱼
> 1227581...@qq.com
>
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> xwwan...@163.com;
> 发送时间:2024年6月16日(星期天) 晚上9:03
> 收件人:"user-zh"
> 主题:回复:Flink如何做到动态关联join多张维度表中的n张表?
>
>
>
> 你好,请问你们是用flink sql api还是datastream api实现这个场景的
>
>
>
>  回复的原邮件 
> | 发件人 | 斗鱼<1227581...@qq.com.INVALID |
> | 日期 | 2024年06月16日 20:35 |
> | 收件人 | user-zh | 抄送至 | |
> | 主题 | Flink如何做到动态关联join多张维度表中的n张表? |
> 请教下各位大佬,目前我们遇到一个场景:
> 1、需要往DWD事实表里写入数据的同时,往Kafka里面写该DWD表的记录信息,该信息
> 2、该Kafka信息会包含一个维度表数据类型的字符串数组
>
> 3、Flink在做实时消费Kafka中数据,根据类型数组,关联不同的维度表,如数组包含【1,2】,则Flink读取Kafka消息后,将DWD的数据关联维度表1和维度表2后,写入DWS表
>
>
>
> 想请问大佬如何实现根据该数组信息动态关联维度表,这些维度表数据量都挺大的,亿级别的数据,需要能满足维度表变化后,关联后的DWS数据也能变化,不知道是否有什么技术方案能实现,有的话麻烦大佬帮忙给个简单示例或者参考链接,感谢!
>
>
>
>
>
>
>
> |
> |
> 斗鱼
> 1227581...@qq.com
> |
> 


Re: Checkpoints and windows size

2024-06-19 Thread banu priya
Hi Wang,

Thanks a lot for your reply.

Currently I have 2s window and check point interval as 10s. Minimum pass
between check point is 5s. What happens is my check points size is growing
gradually. I checked the content inside my rocks db local dir and also the
shared checkpoints directory. Inside chk-x, I have _metaspace file which
shows list of .sst files referred by that check point.

In that I can see that my very old .sst file is still present. I was
expecting it to be cleaned.

Not all old sst files are present. Few are removed (i think it is because
of compaction).

Now how can I maintain check points size under control??.

How can I find idle check point size of my project, I found below link but
it is not talking about parallelism.

https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

Any help would really be appreciated :).

Thanks
Banu

On Wed, 19 Jun, 2024, 9:38 am banu priya,  wrote:

> Hi All,
>
> I have a flink job with key by, tumbling window(2sec window time 
> processing time)and aggregator.
>
> How often should I run the check point??I don't need the data to be
> retained after 2s.
>
> I want to use incremental check point with rocksdb.
>
>
> Thanks
> Banupriya
>


Re: A way to meter number of deserialization errors

2024-06-19 Thread David Radley
Hi Ilya,
I have not got any experience of doing this, but wonder if we could use the 
Flink Metrics 
 . I 
wonder:
- There could be hook point at that part of the code to discover some custom 
code that implements the metrics.
- A better way might be to throw a new unable-to-parse Exception that could be 
caught in non-protobuf code and then a count metric be incremented. I guess the 
metrics would be scoped to a Job – like the watermark metrics.
- I wondered if you might also want to count how many were successfully parsed 
in a non-protobuf layer (Dynamic table sort of level)
- do you know why some messages do not parse? It would interesting to know the 
types of non-parsable messages, I assume with the idea to understand and 
potentially fix the source / parser limitations.

Kind regards, David.

From: Ilya Karpov 
Date: Monday, 17 June 2024 at 12:39
To: user 
Subject: [EXTERNAL] A way to meter number of deserialization errors
Hi all, we are planning to use flink as a connector between kafka and external 
systems. We use protobuf as a message format in kafka. If non-backward 
compatible changes occur we want to skip those messages ('protobuf. 
ignore-parse-errors'

Hi all,
we are planning to use flink as a connector between kafka and external systems. 
We use protobuf as a message format in kafka. If non-backward compatible 
changes occur we want to skip those messages ('protobuf.ignore-parse-errors' = 
'true') but record an error and raise an alert. I didn't find any way to record 
deserialization errors 
(https://github.com/apache/flink/blob/3a15d1ce69ac21d619f60033ec45cae303489c8f/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java#L73),
 does it exist?

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


  1   2   3   4   5   6   7   8   9   10   >