Hi Kaniska,

Not all of these are uniform across all Runners yet but since you have
previously deployed applications with the Flink Runner, here are my
answers from a Flink perspective.

** Shutdown **

For shutting down a Flink pipeline, you can use the "cancel" function:

>./bin/flink cancel <jobId>

When you submit your job in detached mode, e.g., ./bin/flink run -d
/path/to/jar you get a job id in return which you can use for the
cancel command. Alternatively, query the running jobs via

>./bin/flink list

Very soon we will have checkpointing of sources/sinks in Beam. That
would enable you to use Flink's Savepoint feature. Savepoints allow
you to take a snapshot of your Flink job at a moment and time,
shutdown your application, and resume execution at that snapshot later
in time. This works for Flink but not yet for Beam programs.

** Scheduling **

You'll have to setup a cron job or an external scheduling program to
run a Flink job at a specified time. There is no built-in pipeline
scheduling in Flink.

** Monitoring **

Flink has a nice web interface available on port 8081 of the job
manager (master) node. It contains statistics like the number of
records read/written per operator and JVM metrics.

You may also register "Accumulators" which enable you to provide your
own metrics. In the Beam API these are called "Aggregators".
Aggregators get translated to Flink accumulators. For instance, you
can have an aggregator that counts the number of records written by a
particular operator.

You can see these metrics on the web interface or access them via the
Flink Rest API:
https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html

Here is an example of an aggregator in Beam which counts the number of
elements processed:

public class TestAggregator {

  public static void main(String[] args) throws AggregatorRetrievalException {

    class MyDoFn extends DoFn<Integer, Integer> {

      Aggregator<Long, Long> agg = createAggregator("numRecords", new
Sum.SumLongFn());

      @Override
      public void processElement(ProcessContext c) throws Exception {
        agg.addValue(1L);
      }
    }

    FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
    options.setRunner(FlinkPipelineRunner.class);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    MyDoFn myDoFn = new MyDoFn();
    pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(myDoFn));

    PipelineResult result = pipeline.run();

    System.out.println("Result: " +
result.getAggregatorValues(myDoFn.agg).getValues());

  }
}

As expected, this prints [3].


Cheers,
Max

On Mon, Apr 25, 2016 at 7:25 PM, kaniska Mandal
<[email protected]> wrote:
> Whats the recommended approach
>
>> to reliably shut down the pipeline
>
>> to run the beam-flink pipeline in a scheduled manner
>
>> tomonitor the rates/throughputs/throttling/multiple threads spawned  - by
>> Pipeline , any suggestion ?
>
> Thanks
> Kaniska

Reply via email to