Re: Move already processed file from one folder to another folder in flink

2021-09-01 Thread Samir Vasani
Hi

I did not understand why you are using table when we are working on a
program?

On Mon, Jul 26, 2021, 7:20 AM Caizhi Weng  wrote:

> Hi!
>
> For the UDF solution, you can add a "file name" column to your csv file
> like this:
> id,value,filename
> 1,100,
> 2,200,
> 3,300,test.csv
>
> Only the filename of the last record of the csv file is filled, so that
> this indicates the end of file.
>
> Then write a UDF like this:
>
> public class MyUDF extends ScalarFunction {
>   public String eval(String filename) {
> if (filename != null && filename.length > 0) {
>   // do the file renaming here
> }
> return filename
>   }
> }
>
> Suppose your original SQL is like
> SELECT id, value FROM csvSource;
>
> Then you can change your SQL to be
> SELECT id, value, MyUDF(filename) FROM csvSource;
>
> This will, of course, add in a "useless" column and you'll have to deal
> with it in your result file.
>
> For more on UDF, see
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
>
> Samir Vasani  于2021年7月24日周六 下午9:24写道:
>
>> Hi,
>> Let me know if you have any idea as this is very critical for my project.
>>
>> Thanks & Regards,
>> Samir Vasani
>>
>>
>>
>> On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani 
>> wrote:
>>
>>> Hi,
>>>
>>> Can you elaborate more on UDF as I did not understand it.
>>>
>>> Thanks & Regards,
>>> Samir Vasani
>>>
>>>
>>>
>>> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng 
>>> wrote:
>>>
 Hi!

 In this case it won't work, as JobListener#onJobExecuted will only be
 called when the job finishes, successfully or unsuccessfully.

 For a forever-running job I would suggest adding a UDF right after the
 source and adding a special "EOF" record in each of the csv file. This UDF
 monitors the data flowing through it, and if it gets the EOF record it
 moves the file.

 Samir Vasani  于2021年7月23日周五 下午3:44写道:

> Hi Caizhi Weng,
>
> Thanks for your input.
> I would explain the requirement in little more detail.
> Flink pipeline will be running forever (until some issue happens and
> we would need to restart) so It will continuously monitor if a new file
> comes to the *input *folder or not.
> In this case will your suggestion work?
>
>
> Thanks & Regards,
> Samir Vasani
>
>
>
> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng 
> wrote:
>
>> Hi!
>>
>> JobListener#onJobExecuted might help, if your job is not a
>> forever-running streaming job. See
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>>
>> Samir Vasani  于2021年7月23日周五 下午3:22写道:
>>
>>> Hi,
>>>
>>> I am a new bee to flink and facing some challenges to solve below
>>> use case
>>>
>>> Use Case description:
>>>
>>> I will receive a csv file with a timestamp on every single day in
>>> some folder say *input*.The file format would be
>>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>>
>>> Now my flink pipeline will read this csv file in a row by row
>>> fashion and it will be written to my Kafka topic.
>>>
>>> Once the pipeline reads the entire file then this file needs to be
>>> moved to another folder say *historic* so that i can keep *input * 
>>> folder
>>> empty for the new file.
>>>
>>> I googled a lot but did not find anything so can you guide me to
>>> achieve this.
>>>
>>> Let me know if anything else is required.
>>>
>>>
>>> Samir Vasani
>>>
>>


Re: Move already processed file from one folder to another folder in flink

2021-07-25 Thread Caizhi Weng
Hi!

For the UDF solution, you can add a "file name" column to your csv file
like this:
id,value,filename
1,100,
2,200,
3,300,test.csv

Only the filename of the last record of the csv file is filled, so that
this indicates the end of file.

Then write a UDF like this:

public class MyUDF extends ScalarFunction {
  public String eval(String filename) {
if (filename != null && filename.length > 0) {
  // do the file renaming here
}
return filename
  }
}

Suppose your original SQL is like
SELECT id, value FROM csvSource;

Then you can change your SQL to be
SELECT id, value, MyUDF(filename) FROM csvSource;

This will, of course, add in a "useless" column and you'll have to deal
with it in your result file.

For more on UDF, see
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/

Samir Vasani  于2021年7月24日周六 下午9:24写道:

> Hi,
> Let me know if you have any idea as this is very critical for my project.
>
> Thanks & Regards,
> Samir Vasani
>
>
>
> On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani 
> wrote:
>
>> Hi,
>>
>> Can you elaborate more on UDF as I did not understand it.
>>
>> Thanks & Regards,
>> Samir Vasani
>>
>>
>>
>> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> In this case it won't work, as JobListener#onJobExecuted will only be
>>> called when the job finishes, successfully or unsuccessfully.
>>>
>>> For a forever-running job I would suggest adding a UDF right after the
>>> source and adding a special "EOF" record in each of the csv file. This UDF
>>> monitors the data flowing through it, and if it gets the EOF record it
>>> moves the file.
>>>
>>> Samir Vasani  于2021年7月23日周五 下午3:44写道:
>>>
 Hi Caizhi Weng,

 Thanks for your input.
 I would explain the requirement in little more detail.
 Flink pipeline will be running forever (until some issue happens and we
 would need to restart) so It will continuously monitor if a new file comes
 to the *input *folder or not.
 In this case will your suggestion work?


 Thanks & Regards,
 Samir Vasani



 On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng 
 wrote:

> Hi!
>
> JobListener#onJobExecuted might help, if your job is not a
> forever-running streaming job. See
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>
> Samir Vasani  于2021年7月23日周五 下午3:22写道:
>
>> Hi,
>>
>> I am a new bee to flink and facing some challenges to solve below use
>> case
>>
>> Use Case description:
>>
>> I will receive a csv file with a timestamp on every single day in
>> some folder say *input*.The file format would be
>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>
>> Now my flink pipeline will read this csv file in a row by row fashion
>> and it will be written to my Kafka topic.
>>
>> Once the pipeline reads the entire file then this file needs to be
>> moved to another folder say *historic* so that i can keep *input * folder
>> empty for the new file.
>>
>> I googled a lot but did not find anything so can you guide me to
>> achieve this.
>>
>> Let me know if anything else is required.
>>
>>
>> Samir Vasani
>>
>


Re: Move already processed file from one folder to another folder in flink

2021-07-24 Thread Samir Vasani
Hi,
Let me know if you have any idea as this is very critical for my project.

Thanks & Regards,
Samir Vasani



On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani  wrote:

> Hi,
>
> Can you elaborate more on UDF as I did not understand it.
>
> Thanks & Regards,
> Samir Vasani
>
>
>
> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> In this case it won't work, as JobListener#onJobExecuted will only be
>> called when the job finishes, successfully or unsuccessfully.
>>
>> For a forever-running job I would suggest adding a UDF right after the
>> source and adding a special "EOF" record in each of the csv file. This UDF
>> monitors the data flowing through it, and if it gets the EOF record it
>> moves the file.
>>
>> Samir Vasani  于2021年7月23日周五 下午3:44写道:
>>
>>> Hi Caizhi Weng,
>>>
>>> Thanks for your input.
>>> I would explain the requirement in little more detail.
>>> Flink pipeline will be running forever (until some issue happens and we
>>> would need to restart) so It will continuously monitor if a new file comes
>>> to the *input *folder or not.
>>> In this case will your suggestion work?
>>>
>>>
>>> Thanks & Regards,
>>> Samir Vasani
>>>
>>>
>>>
>>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng 
>>> wrote:
>>>
 Hi!

 JobListener#onJobExecuted might help, if your job is not a
 forever-running streaming job. See
 https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html

 Samir Vasani  于2021年7月23日周五 下午3:22写道:

> Hi,
>
> I am a new bee to flink and facing some challenges to solve below use
> case
>
> Use Case description:
>
> I will receive a csv file with a timestamp on every single day in some
> folder say *input*.The file format would be
> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>
> Now my flink pipeline will read this csv file in a row by row fashion
> and it will be written to my Kafka topic.
>
> Once the pipeline reads the entire file then this file needs to be
> moved to another folder say *historic* so that i can keep *input * folder
> empty for the new file.
>
> I googled a lot but did not find anything so can you guide me to
> achieve this.
>
> Let me know if anything else is required.
>
>
> Samir Vasani
>



Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Samir Vasani
Hi,

Can you elaborate more on UDF as I did not understand it.

Thanks & Regards,
Samir Vasani



On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng  wrote:

> Hi!
>
> In this case it won't work, as JobListener#onJobExecuted will only be
> called when the job finishes, successfully or unsuccessfully.
>
> For a forever-running job I would suggest adding a UDF right after the
> source and adding a special "EOF" record in each of the csv file. This UDF
> monitors the data flowing through it, and if it gets the EOF record it
> moves the file.
>
> Samir Vasani  于2021年7月23日周五 下午3:44写道:
>
>> Hi Caizhi Weng,
>>
>> Thanks for your input.
>> I would explain the requirement in little more detail.
>> Flink pipeline will be running forever (until some issue happens and we
>> would need to restart) so It will continuously monitor if a new file comes
>> to the *input *folder or not.
>> In this case will your suggestion work?
>>
>>
>> Thanks & Regards,
>> Samir Vasani
>>
>>
>>
>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> JobListener#onJobExecuted might help, if your job is not a
>>> forever-running streaming job. See
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>>>
>>> Samir Vasani  于2021年7月23日周五 下午3:22写道:
>>>
 Hi,

 I am a new bee to flink and facing some challenges to solve below use
 case

 Use Case description:

 I will receive a csv file with a timestamp on every single day in some
 folder say *input*.The file format would be
 *file_name_dd-mm-yy-hh-mm-ss.csv*.

 Now my flink pipeline will read this csv file in a row by row fashion
 and it will be written to my Kafka topic.

 Once the pipeline reads the entire file then this file needs to be
 moved to another folder say *historic* so that i can keep *input * folder
 empty for the new file.

 I googled a lot but did not find anything so can you guide me to
 achieve this.

 Let me know if anything else is required.


 Samir Vasani

>>>


Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Caizhi Weng
Hi!

In this case it won't work, as JobListener#onJobExecuted will only be
called when the job finishes, successfully or unsuccessfully.

For a forever-running job I would suggest adding a UDF right after the
source and adding a special "EOF" record in each of the csv file. This UDF
monitors the data flowing through it, and if it gets the EOF record it
moves the file.

Samir Vasani  于2021年7月23日周五 下午3:44写道:

> Hi Caizhi Weng,
>
> Thanks for your input.
> I would explain the requirement in little more detail.
> Flink pipeline will be running forever (until some issue happens and we
> would need to restart) so It will continuously monitor if a new file comes
> to the *input *folder or not.
> In this case will your suggestion work?
>
>
> Thanks & Regards,
> Samir Vasani
>
>
>
> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> JobListener#onJobExecuted might help, if your job is not a
>> forever-running streaming job. See
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>>
>> Samir Vasani  于2021年7月23日周五 下午3:22写道:
>>
>>> Hi,
>>>
>>> I am a new bee to flink and facing some challenges to solve below use
>>> case
>>>
>>> Use Case description:
>>>
>>> I will receive a csv file with a timestamp on every single day in some
>>> folder say *input*.The file format would be
>>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>>
>>> Now my flink pipeline will read this csv file in a row by row fashion
>>> and it will be written to my Kafka topic.
>>>
>>> Once the pipeline reads the entire file then this file needs to be moved
>>> to another folder say *historic* so that i can keep *input * folder
>>> empty for the new file.
>>>
>>> I googled a lot but did not find anything so can you guide me to achieve
>>> this.
>>>
>>> Let me know if anything else is required.
>>>
>>>
>>> Samir Vasani
>>>
>>


Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Samir Vasani
Hi Caizhi Weng,

Thanks for your input.
I would explain the requirement in little more detail.
Flink pipeline will be running forever (until some issue happens and we
would need to restart) so It will continuously monitor if a new file comes
to the *input *folder or not.
In this case will your suggestion work?


Thanks & Regards,
Samir Vasani



On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng  wrote:

> Hi!
>
> JobListener#onJobExecuted might help, if your job is not a forever-running
> streaming job. See
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>
> Samir Vasani  于2021年7月23日周五 下午3:22写道:
>
>> Hi,
>>
>> I am a new bee to flink and facing some challenges to solve below use case
>>
>> Use Case description:
>>
>> I will receive a csv file with a timestamp on every single day in some
>> folder say *input*.The file format would be
>> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>>
>> Now my flink pipeline will read this csv file in a row by row fashion and
>> it will be written to my Kafka topic.
>>
>> Once the pipeline reads the entire file then this file needs to be moved
>> to another folder say *historic* so that i can keep *input * folder
>> empty for the new file.
>>
>> I googled a lot but did not find anything so can you guide me to achieve
>> this.
>>
>> Let me know if anything else is required.
>>
>>
>> Samir Vasani
>>
>


Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Caizhi Weng
Hi!

JobListener#onJobExecuted might help, if your job is not a forever-running
streaming job. See
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html

Samir Vasani  于2021年7月23日周五 下午3:22写道:

> Hi,
>
> I am a new bee to flink and facing some challenges to solve below use case
>
> Use Case description:
>
> I will receive a csv file with a timestamp on every single day in some
> folder say *input*.The file format would be
> *file_name_dd-mm-yy-hh-mm-ss.csv*.
>
> Now my flink pipeline will read this csv file in a row by row fashion and
> it will be written to my Kafka topic.
>
> Once the pipeline reads the entire file then this file needs to be moved
> to another folder say *historic* so that i can keep *input * folder empty
> for the new file.
>
> I googled a lot but did not find anything so can you guide me to achieve
> this.
>
> Let me know if anything else is required.
>
>
> Samir Vasani
>


Move already processed file from one folder to another folder in flink

2021-07-23 Thread Samir Vasani
Hi,

I am a new bee to flink and facing some challenges to solve below use case

Use Case description:

I will receive a csv file with a timestamp on every single day in some
folder say *input*.The file format would be
*file_name_dd-mm-yy-hh-mm-ss.csv*.

Now my flink pipeline will read this csv file in a row by row fashion and
it will be written to my Kafka topic.

Once the pipeline reads the entire file then this file needs to be moved to
another folder say *historic* so that i can keep *input * folder empty for
the new file.

I googled a lot but did not find anything so can you guide me to achieve
this.

Let me know if anything else is required.


Samir Vasani