Re: Flink File Source: File read strategy

2023-09-24 Thread Shammon FY
Hi Kirti,

I think you can refer to doc [1] and create a table in your S3 file system
(put your s3 path in the `path` field), then submit jobs to write and read
data with S3.

You can refer to [2] if your jobs are `DataStream`.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

Best,
Shammon FY

On Mon, Sep 25, 2023 at 12:36 PM Kirti Dhar Upadhyay K <
kirti.k.dhar.upadh...@ericsson.com> wrote:

> Thanks Shammon.
>
> Is there any way to verify that File Source reads files directly from S3?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Shammon FY 
> *Sent:* 25 September 2023 06:27
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink File Source: File read strategy
>
>
>
> Hi Kirti,
>
>
>
> I think the default file `Source` does not download files locally
> in Flink, but reads them directly from S3. However, Flink also supports
> configuring temporary directories through `io.tmp.dirs`. If it is a
> user-defined source, it can be obtained from FlinkS3FileSystem. After the
> Flink job is completed, the directory will be cleaned up.
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user <
> user@flink.apache.org> wrote:
>
> Hi Community,
>
>
>
> I am using Flink File Source with Amazon S3.
>
> Please help me on below questions-
>
>
>
>1. When Split Enumerator assigns split to Source Reader, does it
>downloads the file temporarily and then starts reading/decoding the records
>from file or it creates direct stream with S3?
>
>
>
>1. If it is downloaded locally then on which path? Is it configurable?
>
>
>
>1. Does this temporary file automatically gets deleted or any explicit
>cleanup is required?
>
>
>
>
>
> Regards,
>
> Kirti Dhar
>
>


RE: Flink File Source: File read strategy

2023-09-24 Thread Kirti Dhar Upadhyay K via user
Thanks Shammon.
Is there any way to verify that File Source reads files directly from S3?

Regards,
Kirti Dhar

From: Shammon FY 
Sent: 25 September 2023 06:27
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: Flink File Source: File read strategy

Hi Kirti,

I think the default file `Source` does not download files locally in Flink, but 
reads them directly from S3. However, Flink also supports configuring temporary 
directories through `io.tmp.dirs`. If it is a user-defined source, it can be 
obtained from FlinkS3FileSystem. After the Flink job is completed, the 
directory will be cleaned up.

Best,
Shammon FY

On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> wrote:
Hi Community,

I am using Flink File Source with Amazon S3.
Please help me on below questions-


  1.  When Split Enumerator assigns split to Source Reader, does it downloads 
the file temporarily and then starts reading/decoding the records from file or 
it creates direct stream with S3?


  1.  If it is downloaded locally then on which path? Is it configurable?


  1.  Does this temporary file automatically gets deleted or any explicit 
cleanup is required?


Regards,
Kirti Dhar


Re: Side outputs documentation

2023-09-24 Thread Yunfeng Zhou
Hi Alexis,

If you create OutputTag with the constructor `OutputTag(String id)`,
you need to make it anonymous for Flink to analyze the type
information. But if you use the constructor `OutputTag(String id,
TypeInformation typeInfo)`, you need not make it anonymous as you
have provided the type information.

The second constructor is introduced after the document and the first
constructor, and I think the document might have been outdated and not
match with OutputTag's current behavior. A ticket and PR could be
added to fix the document. What do you think?

Best,
Yunfeng

On Fri, Sep 22, 2023 at 4:55 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello,
>
> very quick question, the documentation for side outputs states that an 
> OutputTag "needs to be an anonymous inner class, so that we can analyze the 
> type" (this is written in a comment in the example). Is this really true? 
> I've seen many examples where it's a static element and it seems to work fine.
>
> Regards,
> Alexis.
>


Re: About Flink parquet format

2023-09-24 Thread Feng Jin
Hi Kamal

Indeed, Flink does not handle this exception. When this exception occurs,
the Flink job will fail directly and internally keep restarting,
continuously creating new files.

Personally, I think this logic can be optimized. When this exception
occurs, the file with the exception should be deleted before the Flink job
exits, to avoid generating too many unnecessary files.


Best,
Feng

On Mon, Sep 25, 2023 at 10:27 AM Kamal Mittal 
wrote:

> Hello,
>
>
>
> Can you please share that why Flink is not able to handle exception and
> keeps on creating files continuously without closing?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal via user 
> *Sent:* 21 September 2023 07:58 AM
> *To:* Feng Jin 
> *Cc:* user@flink.apache.org
> *Subject:* RE: About Flink parquet format
>
>
>
> Yes.
>
>
>
> Due to below error, Flink bulk writer never close the part file and keep
> on creating new part file continuously. Is flink not handling exceptions
> like below?
>
>
>
> *From:* Feng Jin 
> *Sent:* 20 September 2023 05:54 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi
>
>
>
> I tested it on my side and also got the same error. This should be a
> limitation of Parquet.
>
>
>
> ```
>
> java.lang.IllegalArgumentException: maxCapacityHint can't be less than
> initialSlabSize 64 1
>
> at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:
> 57) ~[flink-sql-parquet-1.17.1.jar:1.17.1]
>
> at org.apache.parquet.bytes.CapacityByteArrayOutputStream.(
> CapacityByteArrayOutputStream.java:153) ~[flink-sql-parquet-1.17.1.jar:
> 1.17.1]
>
> at org.apache.parquet.column.values.rle.
> RunLengthBitPackingHybridEncoder.(RunLengthBitPackingHybridEncoder
> .jav
>
> ```
>
>
>
>
>
> So I think the current minimum page size that can be set for parquet is
> 64B.
>
>
>
> Best,
>
> Feng
>
>
>
>
>
> On Tue, Sep 19, 2023 at 6:06 PM Kamal Mittal 
> wrote:
>
> Hello,
>
>
>
> If given page size as 1 byte then encountered exception as  -
> ‘maxCapacityHint can't be less than initialSlabSize %d %d’.
>
>
>
> This is coming from class CapacityByteArrayOutputStream and contained in
> parquet-common library.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 19 September 2023 01:01 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> What exception did you encounter? I have tested it locally and it works
> fine.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
>
>
> On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
> wrote:
>
> Hello,
>
>
>
> Checkpointing is enabled and works fine if configured parquet page size is
> at least 64 bytes as otherwise there is exception thrown at back-end.
>
>
>
> Looks to be an issue which is not handled by file sink bulk writer?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 15 September 2023 04:14 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> Check if the checkpoint of the task is enabled and triggered correctly. By
> default, write parquet files will roll a new file when checkpointing.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
> On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Tried parquet file creation with file sink bulk writer.
>
>
>
> If configured parquet page size as low as 1 byte (allowed configuration)
> then flink keeps on creating multiple ‘in-progress’ state files and with
> content only as ‘PAR1’ and never closed the file.
>
>
>
> I want to know what is the reason of not closing the file and creating
> multiple ‘in-progress’ part files or why no error is given if applicable?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: Re: Re: How to read flinkSQL job state

2023-09-24 Thread Hangxiang Yu
Hi, Yifan.
Unfortunately, IIUC, we could get the key and value type only by reading
related sql codes currently.
I think it's useful if we could support SQL semantics for the Processor
API, but it indeed will take lots of effort.

On Thu, Sep 21, 2023 at 12:05 PM Yifan He via user 
wrote:

> Hi Hangxiang,
>
> I still have one question about this problem, when using datastream api I
> know the key and value type I use in state because I
> defined ValueStateDescriptor, but how can I get the ValueStateDescriptor in
> flinksql?
>
> Thanks,
> Yifan
>
> On 2023/09/07 06:16:41 Hangxiang Yu wrote:
> > Hi, Yifan.
> > Which flink version are you using ?
> > You are using filesystem instead of rocksdb so that your checkpoint size
> > may not be incremental IIUC.
> >
> > On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user <
> us...@flink.apache.org>
> > wrote:
> >
> > > Hi Shammon,
> > >
> > > We are using RocksDB,and the configuration is below:
> > > execution.checkpointing.externalized-checkpoint-retention:
> > > RETAIN_ON_CANCELLATION
> > > execution.checkpointing.max-concurrent-checkpoints: 1
> > > execution.checkpointing.min-pause: 0
> > > execution.checkpointing.mode: EXACTLY_ONCE
> > > execution.checkpointing.snapshot-compression: true
> > > execution.checkpointing.timeout: 6
> > > state.backend: FILESYSTEM
> > > state.backend.incremental: true
> > > state.backend.local-recovery: true
> > > state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
> > > state.backend.rocksdb.memory.managed: true
> > > state.backend.rocksdb.memory.write-buffer-ratio: 0.5
> > > state.backend.rocksdb.predefined-options: DEFAULT
> > > state.backend.rocksdb.timer-service.factory: ROCKSDB
> > > state.checkpoints.num-retained: 3
> > >
> > > Thanks,
> > > Yifan
> > >
> > > On 2023/09/06 08:00:31 Shammon FY wrote:
> > > > Hi Yifan,
> > > >
> > > > Besides reading job state, I would like to know what statebackend
> are you
> > > > using? Can you give the configurations about state and checkpoint for
> > > your
> > > > job? Maybe you can check these configuration items to confirm if
> they are
> > > > correct first.
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > > On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan  wrote:
> > > >
> > > > > Hi, Yifan.
> > > > >
> > > > > I think the document[1] means to let us convert the DataStream to
> the
> > > > > Table[2]. Then we could handle the state with the Table API & SQL.
> > > > >
> > > > > Best,
> > > > > Hang
> > > > >
> > > > > [1]
> > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/
> > > > > [2]
> > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
> > > > >
> > > > > Yifan He via user  于2023年9月6日周三 13:10写道:
> > > > >
> > > > >> Hi team,
> > > > >>
> > > > >> We are investigating why the checkpoint size of our FlinkSQL jobs
> > > keeps
> > > > >> growing and we want to look into the checkpoint file to know what
> is
> > > > >> causing the problem. I know we can use the state processor api to
> > > read the
> > > > >> state of jobs using datastream api, but how can I read the state
> of
> > > jobs
> > > > >> using table api & sql?
> > > > >>
> > > > >> Thanks,
> > > > >> Yifan
> > > > >>
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.
> >
>


-- 
Best,
Hangxiang.


RE: About Flink parquet format

2023-09-24 Thread Kamal Mittal via user
Hello,

Can you please share that why Flink is not able to handle exception and keeps 
on creating files continuously without closing?

Rgds,
Kamal

From: Kamal Mittal via user 
Sent: 21 September 2023 07:58 AM
To: Feng Jin 
Cc: user@flink.apache.org
Subject: RE: About Flink parquet format

Yes.

Due to below error, Flink bulk writer never close the part file and keep on 
creating new part file continuously. Is flink not handling exceptions like 
below?

From: Feng Jin mailto:jinfeng1...@gmail.com>>
Sent: 20 September 2023 05:54 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi

I tested it on my side and also got the same error. This should be a limitation 
of Parquet.

```
java.lang.IllegalArgumentException: maxCapacityHint can't be less than 
initialSlabSize 64 1
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:57) 
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:153)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder.(RunLengthBitPackingHybridEncoder.jav
```


So I think the current minimum page size that can be set for parquet is 64B.

Best,
Feng


On Tue, Sep 19, 2023 at 6:06 PM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello,

If given page size as 1 byte then encountered exception as  - ‘maxCapacityHint 
can't be less than initialSlabSize %d %d’.

This is coming from class CapacityByteArrayOutputStream and contained in 
parquet-common library.

Rgds,
Kamal

From: Feng Jin mailto:jinfeng1...@gmail.com>>
Sent: 19 September 2023 01:01 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi Kamal

What exception did you encounter? I have tested it locally and it works fine.


Best,
Feng


On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello,

Checkpointing is enabled and works fine if configured parquet page size is at 
least 64 bytes as otherwise there is exception thrown at back-end.

Looks to be an issue which is not handled by file sink bulk writer?

Rgds,
Kamal

From: Feng Jin mailto:jinfeng1...@gmail.com>>
Sent: 15 September 2023 04:14 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi Kamal

Check if the checkpoint of the task is enabled and triggered correctly. By 
default, write parquet files will roll a new file when checkpointing.


Best,
Feng

On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Tried parquet file creation with file sink bulk writer.

If configured parquet page size as low as 1 byte (allowed configuration) then 
flink keeps on creating multiple ‘in-progress’ state files and with content 
only as ‘PAR1’ and never closed the file.

I want to know what is the reason of not closing the file and creating multiple 
‘in-progress’ part files or why no error is given if applicable?

Rgds,
Kamal


After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-24 Thread rui chen
After using the jemalloc memory allocator for a period of time, checkpoint
timeout occurs and tasks are stuck. Who has encountered this? flink
version:1.13.2, jiemalloc version: 5.3.0


Re: Flink File Source: File read strategy

2023-09-24 Thread Shammon FY
Hi Kirti,

I think the default file `Source` does not download files locally in Flink,
but reads them directly from S3. However, Flink also supports configuring
temporary directories through `io.tmp.dirs`. If it is a user-defined
source, it can be obtained from FlinkS3FileSystem. After the Flink job is
completed, the directory will be cleaned up.

Best,
Shammon FY

On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user <
user@flink.apache.org> wrote:

> Hi Community,
>
>
>
> I am using Flink File Source with Amazon S3.
>
> Please help me on below questions-
>
>
>
>1. When Split Enumerator assigns split to Source Reader, does it
>downloads the file temporarily and then starts reading/decoding the records
>from file or it creates direct stream with S3?
>
>
>
>1. If it is downloaded locally then on which path? Is it configurable?
>
>
>
>1. Does this temporary file automatically gets deleted or any explicit
>cleanup is required?
>
>
>
>
>
> Regards,
>
> Kirti Dhar
>