Re: Flink File Source: File read strategy

2023-09-24 Thread Shammon FY
Hi Kirti,

I think you can refer to doc [1] and create a table in your S3 file system
(put your s3 path in the `path` field), then submit jobs to write and read
data with S3.

You can refer to [2] if your jobs are `DataStream`.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

Best,
Shammon FY

On Mon, Sep 25, 2023 at 12:36 PM Kirti Dhar Upadhyay K <
kirti.k.dhar.upadh...@ericsson.com> wrote:

> Thanks Shammon.
>
> Is there any way to verify that File Source reads files directly from S3?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Shammon FY 
> *Sent:* 25 September 2023 06:27
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink File Source: File read strategy
>
>
>
> Hi Kirti,
>
>
>
> I think the default file `Source` does not download files locally
> in Flink, but reads them directly from S3. However, Flink also supports
> configuring temporary directories through `io.tmp.dirs`. If it is a
> user-defined source, it can be obtained from FlinkS3FileSystem. After the
> Flink job is completed, the directory will be cleaned up.
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user <
> user@flink.apache.org> wrote:
>
> Hi Community,
>
>
>
> I am using Flink File Source with Amazon S3.
>
> Please help me on below questions-
>
>
>
>1. When Split Enumerator assigns split to Source Reader, does it
>downloads the file temporarily and then starts reading/decoding the records
>from file or it creates direct stream with S3?
>
>
>
>1. If it is downloaded locally then on which path? Is it configurable?
>
>
>
>1. Does this temporary file automatically gets deleted or any explicit
>cleanup is required?
>
>
>
>
>
> Regards,
>
> Kirti Dhar
>
>


Re: 1.17.1 - Interval join的时候发生NPE

2023-09-24 Thread Phoes Huang
Hi Hangxiang,

感谢您的回应。
下面是该问题的关键代码,main_stream表是流数据源,数据事件流频约每笔500ms~1s,目前尝试将t1minStream和t5minStream 
assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks())是不会产生这问题造成作业失败了,但输出会有数据丢失。
如有其他思路,麻烦你了。

String t1minSql = "SELECT rowtime, key, id, AVG(num) OVER w_t1min AS avg_t1min 
FROM main_stream WINDOW w_t1min AS (PARTITON BY key ORDER BY rowtime RANGE 
BETWEEN INTERVAL ‘1’ MINUTES PRECEDING AND CURRENT ROW)";

Table t1minTable = tableEnv.sqlQuery(t1minSql);

String t5minSql = "SELECT rowtime, key, id, AVG(num) OVER w_t5min AS 
avg_t5min FROM main_stream WINDOW w_t5min AS (PARTITON BY key ORDER BY rowtime 
RANGE BETWEEN INTERVAL ‘5’ MINUTES PRECEDING AND CURRENT ROW)";

Table t5minTable = tableEnv.sqlQuery(t5minSql);

DataStream t1minStream = tableEnv.toChangelogStream(t1minTable);

DataStream t5minStream = tableEnv.toChangelogStream(t5minTable);

DataStream joinedStream = t1minStream.keyBy(new 
TupleKeySelector("key", "id")).intervalJoin(t5minStream.keyBy(new 
TupleKeySelector("key", 
"id"))).inEventTime().between(Time.milliseconds(-1000L), 
Time.milliseconds(1000L)).process(new ProcessJoinFunction() {
@Override
public void processElement(Row left, Row right, 
ProcessJoinFunction.Context ctx, Collector collector) 
throws Exception {
collector.collect(Row.join(left, right));
}
});



> Hangxiang Yu  於 2023年9月25日 上午10:54 寫道:
> 
> Hi, 请问下是 SQL 作业还是 DataStream 作业,可以提供一些可复现的关键 SQL 或代码吗
> 
> On Sat, Sep 23, 2023 at 3:59 PM Phoes Huang  wrote:
> 
>> Hi,
>> 
>> 单机本地开发执行,遇到该问题,请问有人遇过并解决吗?
>> 
>> 2023-09-23 13:52:03.989 INFO
>> [flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval
>> Join (19/20)
>> (ff8e25fb94208d3c27f549a1e24757ea_e8388ada9c03cfdb1446bb3ccfbd461b_18_0)
>> switched from RUNNING to FAILED on d569c5db-6882-496b-9e92-8a40bb631784 @
>> localhost (dataPort=-1).
>> java.lang.NullPointerException: null
>>at
>> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:149)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:39)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.lambda$logRemoval$1(ChangelogKeyGroupedPriorityQueue.java:153)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.lambda$serialize$4(AbstractStateChangeLogger.java:184)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.serializeRaw(AbstractStateChangeLogger.java:193)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.serialize(AbstractStateChangeLogger.java:178)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:151)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.valueElementRemoved(AbstractStateChangeLogger.java:125)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.logRemoval(ChangelogKeyGroupedPriorityQueue.java:153)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.poll(ChangelogKeyGroupedPriorityQueue.java:69)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:609)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:618)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:268)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
>> 

RE: Flink File Source: File read strategy

2023-09-24 Thread Kirti Dhar Upadhyay K via user
Thanks Shammon.
Is there any way to verify that File Source reads files directly from S3?

Regards,
Kirti Dhar

From: Shammon FY 
Sent: 25 September 2023 06:27
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: Flink File Source: File read strategy

Hi Kirti,

I think the default file `Source` does not download files locally in Flink, but 
reads them directly from S3. However, Flink also supports configuring temporary 
directories through `io.tmp.dirs`. If it is a user-defined source, it can be 
obtained from FlinkS3FileSystem. After the Flink job is completed, the 
directory will be cleaned up.

Best,
Shammon FY

On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> wrote:
Hi Community,

I am using Flink File Source with Amazon S3.
Please help me on below questions-


  1.  When Split Enumerator assigns split to Source Reader, does it downloads 
the file temporarily and then starts reading/decoding the records from file or 
it creates direct stream with S3?


  1.  If it is downloaded locally then on which path? Is it configurable?


  1.  Does this temporary file automatically gets deleted or any explicit 
cleanup is required?


Regards,
Kirti Dhar


Re: Flink SQL的状态清理

2023-09-24 Thread Jane Chan
Hi,

可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86

Best,
Jane

On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:

> 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")
>
>
>
>
> | |
> faronzz
> |
> |
> faro...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | 小昌同学 |
> | 发送日期 | 2023年09月21日 17:06 |
> | 收件人 | user-zh |
> | 主题 | Flink SQL的状态清理 |
>
>
> 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: 退订

2023-09-24 Thread Yunfeng Zhou
Hi,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
user-zh@flink.apache.org  邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。
Please send email to user-zh-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user-zh@flink.apache.org ,
and you can refer [1][2] for more details.

Best,
Yunfeng

On Mon, Sep 25, 2023 at 10:43 AM 星海 <2278179...@qq.com.invalid> wrote:
>
> 退订


Re: Side outputs documentation

2023-09-24 Thread Yunfeng Zhou
Hi Alexis,

If you create OutputTag with the constructor `OutputTag(String id)`,
you need to make it anonymous for Flink to analyze the type
information. But if you use the constructor `OutputTag(String id,
TypeInformation typeInfo)`, you need not make it anonymous as you
have provided the type information.

The second constructor is introduced after the document and the first
constructor, and I think the document might have been outdated and not
match with OutputTag's current behavior. A ticket and PR could be
added to fix the document. What do you think?

Best,
Yunfeng

On Fri, Sep 22, 2023 at 4:55 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello,
>
> very quick question, the documentation for side outputs states that an 
> OutputTag "needs to be an anonymous inner class, so that we can analyze the 
> type" (this is written in a comment in the example). Is this really true? 
> I've seen many examples where it's a static element and it seems to work fine.
>
> Regards,
> Alexis.
>


Re: 1.17.1 - Interval join的时候发生NPE

2023-09-24 Thread Hangxiang Yu
Hi, 请问下是 SQL 作业还是 DataStream 作业,可以提供一些可复现的关键 SQL 或代码吗

On Sat, Sep 23, 2023 at 3:59 PM Phoes Huang  wrote:

> Hi,
>
> 单机本地开发执行,遇到该问题,请问有人遇过并解决吗?
>
> 2023-09-23 13:52:03.989 INFO
> [flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval
> Join (19/20)
> (ff8e25fb94208d3c27f549a1e24757ea_e8388ada9c03cfdb1446bb3ccfbd461b_18_0)
> switched from RUNNING to FAILED on d569c5db-6882-496b-9e92-8a40bb631784 @
> localhost (dataPort=-1).
> java.lang.NullPointerException: null
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:149)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:39)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.lambda$logRemoval$1(ChangelogKeyGroupedPriorityQueue.java:153)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.lambda$serialize$4(AbstractStateChangeLogger.java:184)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.serializeRaw(AbstractStateChangeLogger.java:193)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.serialize(AbstractStateChangeLogger.java:178)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:151)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.valueElementRemoved(AbstractStateChangeLogger.java:125)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.logRemoval(ChangelogKeyGroupedPriorityQueue.java:153)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.poll(ChangelogKeyGroupedPriorityQueue.java:69)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:609)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:618)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:268)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:115)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> 

Re: About Flink parquet format

2023-09-24 Thread Feng Jin
Hi Kamal

Indeed, Flink does not handle this exception. When this exception occurs,
the Flink job will fail directly and internally keep restarting,
continuously creating new files.

Personally, I think this logic can be optimized. When this exception
occurs, the file with the exception should be deleted before the Flink job
exits, to avoid generating too many unnecessary files.


Best,
Feng

On Mon, Sep 25, 2023 at 10:27 AM Kamal Mittal 
wrote:

> Hello,
>
>
>
> Can you please share that why Flink is not able to handle exception and
> keeps on creating files continuously without closing?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal via user 
> *Sent:* 21 September 2023 07:58 AM
> *To:* Feng Jin 
> *Cc:* user@flink.apache.org
> *Subject:* RE: About Flink parquet format
>
>
>
> Yes.
>
>
>
> Due to below error, Flink bulk writer never close the part file and keep
> on creating new part file continuously. Is flink not handling exceptions
> like below?
>
>
>
> *From:* Feng Jin 
> *Sent:* 20 September 2023 05:54 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi
>
>
>
> I tested it on my side and also got the same error. This should be a
> limitation of Parquet.
>
>
>
> ```
>
> java.lang.IllegalArgumentException: maxCapacityHint can't be less than
> initialSlabSize 64 1
>
> at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:
> 57) ~[flink-sql-parquet-1.17.1.jar:1.17.1]
>
> at org.apache.parquet.bytes.CapacityByteArrayOutputStream.(
> CapacityByteArrayOutputStream.java:153) ~[flink-sql-parquet-1.17.1.jar:
> 1.17.1]
>
> at org.apache.parquet.column.values.rle.
> RunLengthBitPackingHybridEncoder.(RunLengthBitPackingHybridEncoder
> .jav
>
> ```
>
>
>
>
>
> So I think the current minimum page size that can be set for parquet is
> 64B.
>
>
>
> Best,
>
> Feng
>
>
>
>
>
> On Tue, Sep 19, 2023 at 6:06 PM Kamal Mittal 
> wrote:
>
> Hello,
>
>
>
> If given page size as 1 byte then encountered exception as  -
> ‘maxCapacityHint can't be less than initialSlabSize %d %d’.
>
>
>
> This is coming from class CapacityByteArrayOutputStream and contained in
> parquet-common library.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 19 September 2023 01:01 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> What exception did you encounter? I have tested it locally and it works
> fine.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
>
>
> On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
> wrote:
>
> Hello,
>
>
>
> Checkpointing is enabled and works fine if configured parquet page size is
> at least 64 bytes as otherwise there is exception thrown at back-end.
>
>
>
> Looks to be an issue which is not handled by file sink bulk writer?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 15 September 2023 04:14 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> Check if the checkpoint of the task is enabled and triggered correctly. By
> default, write parquet files will roll a new file when checkpointing.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
> On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Tried parquet file creation with file sink bulk writer.
>
>
>
> If configured parquet page size as low as 1 byte (allowed configuration)
> then flink keeps on creating multiple ‘in-progress’ state files and with
> content only as ‘PAR1’ and never closed the file.
>
>
>
> I want to know what is the reason of not closing the file and creating
> multiple ‘in-progress’ part files or why no error is given if applicable?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: Re: Re: How to read flinkSQL job state

2023-09-24 Thread Hangxiang Yu
Hi, Yifan.
Unfortunately, IIUC, we could get the key and value type only by reading
related sql codes currently.
I think it's useful if we could support SQL semantics for the Processor
API, but it indeed will take lots of effort.

On Thu, Sep 21, 2023 at 12:05 PM Yifan He via user 
wrote:

> Hi Hangxiang,
>
> I still have one question about this problem, when using datastream api I
> know the key and value type I use in state because I
> defined ValueStateDescriptor, but how can I get the ValueStateDescriptor in
> flinksql?
>
> Thanks,
> Yifan
>
> On 2023/09/07 06:16:41 Hangxiang Yu wrote:
> > Hi, Yifan.
> > Which flink version are you using ?
> > You are using filesystem instead of rocksdb so that your checkpoint size
> > may not be incremental IIUC.
> >
> > On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user <
> us...@flink.apache.org>
> > wrote:
> >
> > > Hi Shammon,
> > >
> > > We are using RocksDB,and the configuration is below:
> > > execution.checkpointing.externalized-checkpoint-retention:
> > > RETAIN_ON_CANCELLATION
> > > execution.checkpointing.max-concurrent-checkpoints: 1
> > > execution.checkpointing.min-pause: 0
> > > execution.checkpointing.mode: EXACTLY_ONCE
> > > execution.checkpointing.snapshot-compression: true
> > > execution.checkpointing.timeout: 6
> > > state.backend: FILESYSTEM
> > > state.backend.incremental: true
> > > state.backend.local-recovery: true
> > > state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
> > > state.backend.rocksdb.memory.managed: true
> > > state.backend.rocksdb.memory.write-buffer-ratio: 0.5
> > > state.backend.rocksdb.predefined-options: DEFAULT
> > > state.backend.rocksdb.timer-service.factory: ROCKSDB
> > > state.checkpoints.num-retained: 3
> > >
> > > Thanks,
> > > Yifan
> > >
> > > On 2023/09/06 08:00:31 Shammon FY wrote:
> > > > Hi Yifan,
> > > >
> > > > Besides reading job state, I would like to know what statebackend
> are you
> > > > using? Can you give the configurations about state and checkpoint for
> > > your
> > > > job? Maybe you can check these configuration items to confirm if
> they are
> > > > correct first.
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > > On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan  wrote:
> > > >
> > > > > Hi, Yifan.
> > > > >
> > > > > I think the document[1] means to let us convert the DataStream to
> the
> > > > > Table[2]. Then we could handle the state with the Table API & SQL.
> > > > >
> > > > > Best,
> > > > > Hang
> > > > >
> > > > > [1]
> > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/
> > > > > [2]
> > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
> > > > >
> > > > > Yifan He via user  于2023年9月6日周三 13:10写道:
> > > > >
> > > > >> Hi team,
> > > > >>
> > > > >> We are investigating why the checkpoint size of our FlinkSQL jobs
> > > keeps
> > > > >> growing and we want to look into the checkpoint file to know what
> is
> > > > >> causing the problem. I know we can use the state processor api to
> > > read the
> > > > >> state of jobs using datastream api, but how can I read the state
> of
> > > jobs
> > > > >> using table api & sql?
> > > > >>
> > > > >> Thanks,
> > > > >> Yifan
> > > > >>
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.
> >
>


-- 
Best,
Hangxiang.


在使用使用jemalloc内存分配器一段时间后,出现checkpoint 超时,任务卡住的情况

2023-09-24 Thread rui chen
在使用使用jemalloc内存分配器一段时间后,出现checkpoint
超时,任务卡住的情况,哪位遇到过呢?flink版本:flink-1.13.2,jiemalloc版本:5.3.0


RE: About Flink parquet format

2023-09-24 Thread Kamal Mittal via user
Hello,

Can you please share that why Flink is not able to handle exception and keeps 
on creating files continuously without closing?

Rgds,
Kamal

From: Kamal Mittal via user 
Sent: 21 September 2023 07:58 AM
To: Feng Jin 
Cc: user@flink.apache.org
Subject: RE: About Flink parquet format

Yes.

Due to below error, Flink bulk writer never close the part file and keep on 
creating new part file continuously. Is flink not handling exceptions like 
below?

From: Feng Jin mailto:jinfeng1...@gmail.com>>
Sent: 20 September 2023 05:54 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi

I tested it on my side and also got the same error. This should be a limitation 
of Parquet.

```
java.lang.IllegalArgumentException: maxCapacityHint can't be less than 
initialSlabSize 64 1
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:57) 
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:153)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder.(RunLengthBitPackingHybridEncoder.jav
```


So I think the current minimum page size that can be set for parquet is 64B.

Best,
Feng


On Tue, Sep 19, 2023 at 6:06 PM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello,

If given page size as 1 byte then encountered exception as  - ‘maxCapacityHint 
can't be less than initialSlabSize %d %d’.

This is coming from class CapacityByteArrayOutputStream and contained in 
parquet-common library.

Rgds,
Kamal

From: Feng Jin mailto:jinfeng1...@gmail.com>>
Sent: 19 September 2023 01:01 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi Kamal

What exception did you encounter? I have tested it locally and it works fine.


Best,
Feng


On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello,

Checkpointing is enabled and works fine if configured parquet page size is at 
least 64 bytes as otherwise there is exception thrown at back-end.

Looks to be an issue which is not handled by file sink bulk writer?

Rgds,
Kamal

From: Feng Jin mailto:jinfeng1...@gmail.com>>
Sent: 15 September 2023 04:14 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi Kamal

Check if the checkpoint of the task is enabled and triggered correctly. By 
default, write parquet files will roll a new file when checkpointing.


Best,
Feng

On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Tried parquet file creation with file sink bulk writer.

If configured parquet page size as low as 1 byte (allowed configuration) then 
flink keeps on creating multiple ‘in-progress’ state files and with content 
only as ‘PAR1’ and never closed the file.

I want to know what is the reason of not closing the file and creating multiple 
‘in-progress’ part files or why no error is given if applicable?

Rgds,
Kamal


After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-24 Thread rui chen
After using the jemalloc memory allocator for a period of time, checkpoint
timeout occurs and tasks are stuck. Who has encountered this? flink
version:1.13.2, jiemalloc version: 5.3.0


After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-24 Thread rui chen
After using the jemalloc memory allocator for a period of time, checkpoint
timeout occurs and tasks are stuck. Who has encountered this? flink
version:1.13.2, jiemalloc version: 5.3.0


Re: Flink File Source: File read strategy

2023-09-24 Thread Shammon FY
Hi Kirti,

I think the default file `Source` does not download files locally in Flink,
but reads them directly from S3. However, Flink also supports configuring
temporary directories through `io.tmp.dirs`. If it is a user-defined
source, it can be obtained from FlinkS3FileSystem. After the Flink job is
completed, the directory will be cleaned up.

Best,
Shammon FY

On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user <
user@flink.apache.org> wrote:

> Hi Community,
>
>
>
> I am using Flink File Source with Amazon S3.
>
> Please help me on below questions-
>
>
>
>1. When Split Enumerator assigns split to Source Reader, does it
>downloads the file temporarily and then starts reading/decoding the records
>from file or it creates direct stream with S3?
>
>
>
>1. If it is downloaded locally then on which path? Is it configurable?
>
>
>
>1. Does this temporary file automatically gets deleted or any explicit
>cleanup is required?
>
>
>
>
>
> Regards,
>
> Kirti Dhar
>


flink两阶段提交

2023-09-24 Thread 海风
请教一下,flink的两阶段提交对于sink算子,预提交是在做检查点的哪个阶段触发的?预提交时具体是做了什么工作?