RE: Flink bulk and record file source format metrices

2023-06-18 Thread Kamal Mittal via user
Thanks for the approach but few updates w.r.t my query sent  –

Parquet file is a binary file so when I said corrupt record it is complete file 
in itself can’t be processed right?
So it is not counting corrupt records rather counting corrupt files or splits 
in Flink?

From: Ken Krugler 
Sent: 16 June 2023 08:19 PM
To: user@flink.apache.org
Cc: Shammon FY ; Kamal Mittal 
Subject: Re: Flink bulk and record file source format metrices

Hi Kamal,

In a similar situation, when a decoding failure happened I would generate a 
special record that I could then detect/filter out (and increment a counter) in 
a FilterFunction immediately following the source.

— Ken



On Jun 16, 2023, at 2:11 AM, Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:

Hello,

Any way-forward, please suggest.

Rgds,
Kamal

From: Kamal Mittal via user 
mailto:user@flink.apache.org>>
Sent: 15 June 2023 10:39 AM
To: Shammon FY mailto:zjur...@gmail.com>>
Cc: user@flink.apache.org
Subject: RE: Flink bulk and record file source format metrices

Hello,

I need one counter matric for no. of corrupt records while decoding parquet 
records at data source level. I know where the corrupt record handling requires 
but due to non-existence of “SourceContext” or “RuntimeContext”, unable to do 
anything w.r.t metric.

It is needed similarly the way “SourceReaderBase” class maintaining one counter 
for no. of records emitted.

Rgds,
Kamal

From: Shammon FY mailto:zjur...@gmail.com>>
Sent: 14 June 2023 05:33 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Flink bulk and record file source format metrices

Hi Kamal,

Can you give more information about the metris you want? In Flink each source 
task has one source reader which already has some metrics, you can refer to 
metrics doc[1] for more detailed information.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

Best,
Shammon FY

On Tue, Jun 13, 2023 at 11:13 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Using Flink record stream format file source API as below for parquet records 
reading.

FileSource.FileSourceBuilder source = 
FileSource.forRecordStreamFormat(streamformat, path);
source.monitorContinuously(Duration.ofMillis(1));

Want to log/generate metrices for corrupt records and for the same need to log 
flink metrices at source level in parquet reader class, is there any way to do 
that as right now no handle for SourceContext available?

Rgds,
Kamal

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Flink CDC消费mysql `0000-00-00 00:00:00` 时间值问题

2023-06-18 Thread casel.chen
Flink作业消费数据源来自mysql业务表,后者使用了`-00-00 00:00:00`这个dummy date来存时间,直接用Flink 
CDC消费的话会被解析成`1970-01-01 00:00:00` (mysql中是datetime类型)或者`1970-01-01 08:00:00` 
(mysql中是timestamp类型)。
问题1:可否给个Flink CDC选项,遇到这种dummy时间转成NULL?存量query和增量消费binlog处理这种dummy时间结果一致么?
问题2:如果是mysql -> mysql同步场景,使用Flink CDC在timestamp类型下不能够同步`-00-00 
00:00:00`这个dummy date,原因是Flink CDC转成了`1970-01-01 08:00:00` 
CST,对应到UTC时区是`1970-01-01 00:00:00`,而mysql官方文档[1]定义timestamp类型取值范围是'1970-01-01 
00:00:01' UTC to '2038-01-19 03:14:07' UTC,因此会认为`1970-01-01 08:00:00` CST是非法数据。


[1] https://dev.mysql.com/doc/refman/5.7/en/datetime.html

Re-interepreting Kafka source as a keyed stream?

2023-06-18 Thread Piotr Domagalski
Hi,

I was wondering if it would be safe for me to make use of
reinterpretAsKeyedStream on a Kafka source in order to have an
"embarrassingly parallel" job without any .keyBy().

My Kafka topic is partitioned by the same id I'm then sending through a
session window operator. Therefore there's in theory no need for data to be
transferred between subtasks (between Kafka source and the windowing
operator). Is it possible to avoid this by using reinterpretAsKeyedStream on
the source?

I'm worried about the warning from the docs saying:

WARNING: The re-interpreted data stream MUST already be pre-partitioned in
EXACTLY the same way Flink’s keyBy would partition the data in a shuffle
w.r.t. key-group assignment.

Of course, the partitioning in Kafka will not be *exactly* the same... What
problems might this cause? I did it on a very small subset of data and
didn't notice any issues.

-- 
Piotr Domagalski