Hi Kaniska, Not all of these are uniform across all Runners yet but since you have previously deployed applications with the Flink Runner, here are my answers from a Flink perspective.
** Shutdown ** For shutting down a Flink pipeline, you can use the "cancel" function: >./bin/flink cancel <jobId> When you submit your job in detached mode, e.g., ./bin/flink run -d /path/to/jar you get a job id in return which you can use for the cancel command. Alternatively, query the running jobs via >./bin/flink list Very soon we will have checkpointing of sources/sinks in Beam. That would enable you to use Flink's Savepoint feature. Savepoints allow you to take a snapshot of your Flink job at a moment and time, shutdown your application, and resume execution at that snapshot later in time. This works for Flink but not yet for Beam programs. ** Scheduling ** You'll have to setup a cron job or an external scheduling program to run a Flink job at a specified time. There is no built-in pipeline scheduling in Flink. ** Monitoring ** Flink has a nice web interface available on port 8081 of the job manager (master) node. It contains statistics like the number of records read/written per operator and JVM metrics. You may also register "Accumulators" which enable you to provide your own metrics. In the Beam API these are called "Aggregators". Aggregators get translated to Flink accumulators. For instance, you can have an aggregator that counts the number of records written by a particular operator. You can see these metrics on the web interface or access them via the Flink Rest API: https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html Here is an example of an aggregator in Beam which counts the number of elements processed: public class TestAggregator { public static void main(String[] args) throws AggregatorRetrievalException { class MyDoFn extends DoFn<Integer, Integer> { Aggregator<Long, Long> agg = createAggregator("numRecords", new Sum.SumLongFn()); @Override public void processElement(ProcessContext c) throws Exception { agg.addValue(1L); } } FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); options.setRunner(FlinkPipelineRunner.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); MyDoFn myDoFn = new MyDoFn(); pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(myDoFn)); PipelineResult result = pipeline.run(); System.out.println("Result: " + result.getAggregatorValues(myDoFn.agg).getValues()); } } As expected, this prints [3]. Cheers, Max On Mon, Apr 25, 2016 at 7:25 PM, kaniska Mandal <[email protected]> wrote: > Whats the recommended approach > >> to reliably shut down the pipeline > >> to run the beam-flink pipeline in a scheduled manner > >> tomonitor the rates/throughputs/throttling/multiple threads spawned - by >> Pipeline , any suggestion ? > > Thanks > Kaniska
