Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an
execute() call in your program, this will lead to two Flink jobs being
executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction)
which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:

> Hi,
>
> I see that an InputFormat's open() and nextRecord() methods get called for
> each terminal operation on a given dataset using that particular
> InputFormat. Is it possible to avoid this - possibly using some caching
> technique in Flink?
>
> For example, I've some code like below and I see for both the last two
> statements (reduce() and count()) the above methods in the input format get
> called. Btw. this is a custom input format I wrote to represent a binary
> matrix stored as Short values.
>
> ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
>
> DataSet<Short[]> ds = env.createInput(smif, 
> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
>
> MapOperator<Short[], DoubleStatistics> op = ds.map(...)
>
> *op.reduce(...)*
>
> *op.count(...)*
>
>
> Thank you,
> Saliya
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>

Reply via email to