Hi,
Well yes, I could probably make it work with a constant number of operators
(and consequently buffers) by developing specific input and output classes, and
that way I'd have a workaround for that buffers issue.
The size of my job is input-dependent mostly because my code creates one full
processing chain per input folder (there is one folder per hour of data) and
that processing chain has many functions. The number of input folders is an
argument to the software and can vary from 1 to hundreds (when we re-compute
something like a month of data).
for(String folder:folders){
env = Environment.getExecutionEnvironment();
env.readText(folder).[......].writeText(folder + "_processed");
env.execute();
}
There is one full processing chain per folder (a loop is creating them) because
the name of the output is the same as the name of the input, and I did not want
to create a specific "rolling-output" with bucketers.
So, yes, I could develop a source that supports a list of folders and puts the
source name into the produced data. My processing could also handle tuples
where one field is the source folder and use it as a key where appropriate, and
finally yes I could also create a sink that will "dispatch" the datasets to the
correct folder according to that field of the tuple.
But at first that seemed too complicated (premature optimization) so I coded my
job the "naïve" way then splitted it because I thought that there would be some
sort of recycling of those buffers between jobs.
If the recycling works in normal conditions maybe the question is whether it
also works when multiple jobs are ran from within the same jar VS running the
jar multiple times ?
PS: I don't have log files at hand, the app is ran on a separate and secured
platform. Sure, I could try to reproduce the issue with a mock app but I'm not
sure it would help the discussion.