Re: Move already processed file from one folder to another folder in flink
Hi I did not understand why you are using table when we are working on a program? On Mon, Jul 26, 2021, 7:20 AM Caizhi Weng wrote: > Hi! > > For the UDF solution, you can add a "file name" column to your csv file > like this: > id,value,filename > 1,100, > 2,200, > 3,300,test.csv > > Only the filename of the last record of the csv file is filled, so that > this indicates the end of file. > > Then write a UDF like this: > > public class MyUDF extends ScalarFunction { > public String eval(String filename) { > if (filename != null && filename.length > 0) { > // do the file renaming here > } > return filename > } > } > > Suppose your original SQL is like > SELECT id, value FROM csvSource; > > Then you can change your SQL to be > SELECT id, value, MyUDF(filename) FROM csvSource; > > This will, of course, add in a "useless" column and you'll have to deal > with it in your result file. > > For more on UDF, see > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ > > Samir Vasani 于2021年7月24日周六 下午9:24写道: > >> Hi, >> Let me know if you have any idea as this is very critical for my project. >> >> Thanks & Regards, >> Samir Vasani >> >> >> >> On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani >> wrote: >> >>> Hi, >>> >>> Can you elaborate more on UDF as I did not understand it. >>> >>> Thanks & Regards, >>> Samir Vasani >>> >>> >>> >>> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng >>> wrote: >>> Hi! In this case it won't work, as JobListener#onJobExecuted will only be called when the job finishes, successfully or unsuccessfully. For a forever-running job I would suggest adding a UDF right after the source and adding a special "EOF" record in each of the csv file. This UDF monitors the data flowing through it, and if it gets the EOF record it moves the file. Samir Vasani 于2021年7月23日周五 下午3:44写道: > Hi Caizhi Weng, > > Thanks for your input. > I would explain the requirement in little more detail. > Flink pipeline will be running forever (until some issue happens and > we would need to restart) so It will continuously monitor if a new file > comes to the *input *folder or not. > In this case will your suggestion work? > > > Thanks & Regards, > Samir Vasani > > > > On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng > wrote: > >> Hi! >> >> JobListener#onJobExecuted might help, if your job is not a >> forever-running streaming job. See >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html >> >> Samir Vasani 于2021年7月23日周五 下午3:22写道: >> >>> Hi, >>> >>> I am a new bee to flink and facing some challenges to solve below >>> use case >>> >>> Use Case description: >>> >>> I will receive a csv file with a timestamp on every single day in >>> some folder say *input*.The file format would be >>> *file_name_dd-mm-yy-hh-mm-ss.csv*. >>> >>> Now my flink pipeline will read this csv file in a row by row >>> fashion and it will be written to my Kafka topic. >>> >>> Once the pipeline reads the entire file then this file needs to be >>> moved to another folder say *historic* so that i can keep *input * >>> folder >>> empty for the new file. >>> >>> I googled a lot but did not find anything so can you guide me to >>> achieve this. >>> >>> Let me know if anything else is required. >>> >>> >>> Samir Vasani >>> >>
Re: Move already processed file from one folder to another folder in flink
Hi! For the UDF solution, you can add a "file name" column to your csv file like this: id,value,filename 1,100, 2,200, 3,300,test.csv Only the filename of the last record of the csv file is filled, so that this indicates the end of file. Then write a UDF like this: public class MyUDF extends ScalarFunction { public String eval(String filename) { if (filename != null && filename.length > 0) { // do the file renaming here } return filename } } Suppose your original SQL is like SELECT id, value FROM csvSource; Then you can change your SQL to be SELECT id, value, MyUDF(filename) FROM csvSource; This will, of course, add in a "useless" column and you'll have to deal with it in your result file. For more on UDF, see https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ Samir Vasani 于2021年7月24日周六 下午9:24写道: > Hi, > Let me know if you have any idea as this is very critical for my project. > > Thanks & Regards, > Samir Vasani > > > > On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani > wrote: > >> Hi, >> >> Can you elaborate more on UDF as I did not understand it. >> >> Thanks & Regards, >> Samir Vasani >> >> >> >> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng wrote: >> >>> Hi! >>> >>> In this case it won't work, as JobListener#onJobExecuted will only be >>> called when the job finishes, successfully or unsuccessfully. >>> >>> For a forever-running job I would suggest adding a UDF right after the >>> source and adding a special "EOF" record in each of the csv file. This UDF >>> monitors the data flowing through it, and if it gets the EOF record it >>> moves the file. >>> >>> Samir Vasani 于2021年7月23日周五 下午3:44写道: >>> Hi Caizhi Weng, Thanks for your input. I would explain the requirement in little more detail. Flink pipeline will be running forever (until some issue happens and we would need to restart) so It will continuously monitor if a new file comes to the *input *folder or not. In this case will your suggestion work? Thanks & Regards, Samir Vasani On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng wrote: > Hi! > > JobListener#onJobExecuted might help, if your job is not a > forever-running streaming job. See > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html > > Samir Vasani 于2021年7月23日周五 下午3:22写道: > >> Hi, >> >> I am a new bee to flink and facing some challenges to solve below use >> case >> >> Use Case description: >> >> I will receive a csv file with a timestamp on every single day in >> some folder say *input*.The file format would be >> *file_name_dd-mm-yy-hh-mm-ss.csv*. >> >> Now my flink pipeline will read this csv file in a row by row fashion >> and it will be written to my Kafka topic. >> >> Once the pipeline reads the entire file then this file needs to be >> moved to another folder say *historic* so that i can keep *input * folder >> empty for the new file. >> >> I googled a lot but did not find anything so can you guide me to >> achieve this. >> >> Let me know if anything else is required. >> >> >> Samir Vasani >> >
Re: Move already processed file from one folder to another folder in flink
Hi, Let me know if you have any idea as this is very critical for my project. Thanks & Regards, Samir Vasani On Fri, Jul 23, 2021 at 1:26 PM Samir Vasani wrote: > Hi, > > Can you elaborate more on UDF as I did not understand it. > > Thanks & Regards, > Samir Vasani > > > > On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng wrote: > >> Hi! >> >> In this case it won't work, as JobListener#onJobExecuted will only be >> called when the job finishes, successfully or unsuccessfully. >> >> For a forever-running job I would suggest adding a UDF right after the >> source and adding a special "EOF" record in each of the csv file. This UDF >> monitors the data flowing through it, and if it gets the EOF record it >> moves the file. >> >> Samir Vasani 于2021年7月23日周五 下午3:44写道: >> >>> Hi Caizhi Weng, >>> >>> Thanks for your input. >>> I would explain the requirement in little more detail. >>> Flink pipeline will be running forever (until some issue happens and we >>> would need to restart) so It will continuously monitor if a new file comes >>> to the *input *folder or not. >>> In this case will your suggestion work? >>> >>> >>> Thanks & Regards, >>> Samir Vasani >>> >>> >>> >>> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng >>> wrote: >>> Hi! JobListener#onJobExecuted might help, if your job is not a forever-running streaming job. See https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html Samir Vasani 于2021年7月23日周五 下午3:22写道: > Hi, > > I am a new bee to flink and facing some challenges to solve below use > case > > Use Case description: > > I will receive a csv file with a timestamp on every single day in some > folder say *input*.The file format would be > *file_name_dd-mm-yy-hh-mm-ss.csv*. > > Now my flink pipeline will read this csv file in a row by row fashion > and it will be written to my Kafka topic. > > Once the pipeline reads the entire file then this file needs to be > moved to another folder say *historic* so that i can keep *input * folder > empty for the new file. > > I googled a lot but did not find anything so can you guide me to > achieve this. > > Let me know if anything else is required. > > > Samir Vasani >
Re: Move already processed file from one folder to another folder in flink
Hi, Can you elaborate more on UDF as I did not understand it. Thanks & Regards, Samir Vasani On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng wrote: > Hi! > > In this case it won't work, as JobListener#onJobExecuted will only be > called when the job finishes, successfully or unsuccessfully. > > For a forever-running job I would suggest adding a UDF right after the > source and adding a special "EOF" record in each of the csv file. This UDF > monitors the data flowing through it, and if it gets the EOF record it > moves the file. > > Samir Vasani 于2021年7月23日周五 下午3:44写道: > >> Hi Caizhi Weng, >> >> Thanks for your input. >> I would explain the requirement in little more detail. >> Flink pipeline will be running forever (until some issue happens and we >> would need to restart) so It will continuously monitor if a new file comes >> to the *input *folder or not. >> In this case will your suggestion work? >> >> >> Thanks & Regards, >> Samir Vasani >> >> >> >> On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng wrote: >> >>> Hi! >>> >>> JobListener#onJobExecuted might help, if your job is not a >>> forever-running streaming job. See >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html >>> >>> Samir Vasani 于2021年7月23日周五 下午3:22写道: >>> Hi, I am a new bee to flink and facing some challenges to solve below use case Use Case description: I will receive a csv file with a timestamp on every single day in some folder say *input*.The file format would be *file_name_dd-mm-yy-hh-mm-ss.csv*. Now my flink pipeline will read this csv file in a row by row fashion and it will be written to my Kafka topic. Once the pipeline reads the entire file then this file needs to be moved to another folder say *historic* so that i can keep *input * folder empty for the new file. I googled a lot but did not find anything so can you guide me to achieve this. Let me know if anything else is required. Samir Vasani >>>
Re: Move already processed file from one folder to another folder in flink
Hi! In this case it won't work, as JobListener#onJobExecuted will only be called when the job finishes, successfully or unsuccessfully. For a forever-running job I would suggest adding a UDF right after the source and adding a special "EOF" record in each of the csv file. This UDF monitors the data flowing through it, and if it gets the EOF record it moves the file. Samir Vasani 于2021年7月23日周五 下午3:44写道: > Hi Caizhi Weng, > > Thanks for your input. > I would explain the requirement in little more detail. > Flink pipeline will be running forever (until some issue happens and we > would need to restart) so It will continuously monitor if a new file comes > to the *input *folder or not. > In this case will your suggestion work? > > > Thanks & Regards, > Samir Vasani > > > > On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng wrote: > >> Hi! >> >> JobListener#onJobExecuted might help, if your job is not a >> forever-running streaming job. See >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html >> >> Samir Vasani 于2021年7月23日周五 下午3:22写道: >> >>> Hi, >>> >>> I am a new bee to flink and facing some challenges to solve below use >>> case >>> >>> Use Case description: >>> >>> I will receive a csv file with a timestamp on every single day in some >>> folder say *input*.The file format would be >>> *file_name_dd-mm-yy-hh-mm-ss.csv*. >>> >>> Now my flink pipeline will read this csv file in a row by row fashion >>> and it will be written to my Kafka topic. >>> >>> Once the pipeline reads the entire file then this file needs to be moved >>> to another folder say *historic* so that i can keep *input * folder >>> empty for the new file. >>> >>> I googled a lot but did not find anything so can you guide me to achieve >>> this. >>> >>> Let me know if anything else is required. >>> >>> >>> Samir Vasani >>> >>
Re: Move already processed file from one folder to another folder in flink
Hi Caizhi Weng, Thanks for your input. I would explain the requirement in little more detail. Flink pipeline will be running forever (until some issue happens and we would need to restart) so It will continuously monitor if a new file comes to the *input *folder or not. In this case will your suggestion work? Thanks & Regards, Samir Vasani On Fri, Jul 23, 2021 at 1:07 PM Caizhi Weng wrote: > Hi! > > JobListener#onJobExecuted might help, if your job is not a forever-running > streaming job. See > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html > > Samir Vasani 于2021年7月23日周五 下午3:22写道: > >> Hi, >> >> I am a new bee to flink and facing some challenges to solve below use case >> >> Use Case description: >> >> I will receive a csv file with a timestamp on every single day in some >> folder say *input*.The file format would be >> *file_name_dd-mm-yy-hh-mm-ss.csv*. >> >> Now my flink pipeline will read this csv file in a row by row fashion and >> it will be written to my Kafka topic. >> >> Once the pipeline reads the entire file then this file needs to be moved >> to another folder say *historic* so that i can keep *input * folder >> empty for the new file. >> >> I googled a lot but did not find anything so can you guide me to achieve >> this. >> >> Let me know if anything else is required. >> >> >> Samir Vasani >> >
Re: Move already processed file from one folder to another folder in flink
Hi! JobListener#onJobExecuted might help, if your job is not a forever-running streaming job. See https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html Samir Vasani 于2021年7月23日周五 下午3:22写道: > Hi, > > I am a new bee to flink and facing some challenges to solve below use case > > Use Case description: > > I will receive a csv file with a timestamp on every single day in some > folder say *input*.The file format would be > *file_name_dd-mm-yy-hh-mm-ss.csv*. > > Now my flink pipeline will read this csv file in a row by row fashion and > it will be written to my Kafka topic. > > Once the pipeline reads the entire file then this file needs to be moved > to another folder say *historic* so that i can keep *input * folder empty > for the new file. > > I googled a lot but did not find anything so can you guide me to achieve > this. > > Let me know if anything else is required. > > > Samir Vasani >
Move already processed file from one folder to another folder in flink
Hi, I am a new bee to flink and facing some challenges to solve below use case Use Case description: I will receive a csv file with a timestamp on every single day in some folder say *input*.The file format would be *file_name_dd-mm-yy-hh-mm-ss.csv*. Now my flink pipeline will read this csv file in a row by row fashion and it will be written to my Kafka topic. Once the pipeline reads the entire file then this file needs to be moved to another folder say *historic* so that i can keep *input * folder empty for the new file. I googled a lot but did not find anything so can you guide me to achieve this. Let me know if anything else is required. Samir Vasani