Hi Max, Many thanks for the great explanations.
*Few questions regarding 'Execution Strategy of Group of similar / disparate Beam Pipelines' * > Is it a feasible idea to maintain any Load Balancer in front of the 'Beam-Flink Pipeline Executor Process' in order to control rate-limits / throttling ? > Should we maintain multiple containers of 'Beam-Flink Pipelines' or a single container with multiple instances of Beam-Flink Pipeline ? > Is it a good idea to use any external 'Process Flow Controller' like Apache NiFi to wire the Beam Pipelines and launch / halt / resume them programmatically / interactively ? *More questions related to graceful shutdown and restart* Currently Flink Pipeline Runner#run() throws Runtime Exception. > So is it good enough to add a shutdownhook on Beam Pipeline and close resources like KafkaProducer when Pipeline Job is killed due to Runtime Exception ? It would have been better if a custom exception was thrown , so that the Job could gracefully handle it ! BTW, I tried calling close() on FlinkKafkaProucer form inside Beam-Flink Pipeline Runner , but the producer didn't stop. > I understand we can use monitd to restart a process, but any suggestion to implement external 'Beam Pipeline Monitoring Agent' to auto-retry to restart the Pipeline ? Let me know if any of the above points sound logical, then I'll go ahead and create Feature request. Thanks, Kaniska On Mon, Apr 25, 2016 at 11:12 AM, Maximilian Michels <[email protected]> wrote: > Hi Kaniska, > > Not all of these are uniform across all Runners yet but since you have > previously deployed applications with the Flink Runner, here are my > answers from a Flink perspective. > > ** Shutdown ** > > For shutting down a Flink pipeline, you can use the "cancel" function: > > >./bin/flink cancel <jobId> > > When you submit your job in detached mode, e.g., ./bin/flink run -d > /path/to/jar you get a job id in return which you can use for the > cancel command. Alternatively, query the running jobs via > > >./bin/flink list > > Very soon we will have checkpointing of sources/sinks in Beam. That > would enable you to use Flink's Savepoint feature. Savepoints allow > you to take a snapshot of your Flink job at a moment and time, > shutdown your application, and resume execution at that snapshot later > in time. This works for Flink but not yet for Beam programs. > > ** Scheduling ** > > You'll have to setup a cron job or an external scheduling program to > run a Flink job at a specified time. There is no built-in pipeline > scheduling in Flink. > > ** Monitoring ** > > Flink has a nice web interface available on port 8081 of the job > manager (master) node. It contains statistics like the number of > records read/written per operator and JVM metrics. > > You may also register "Accumulators" which enable you to provide your > own metrics. In the Beam API these are called "Aggregators". > Aggregators get translated to Flink accumulators. For instance, you > can have an aggregator that counts the number of records written by a > particular operator. > > You can see these metrics on the web interface or access them via the > Flink Rest API: > > https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html > > Here is an example of an aggregator in Beam which counts the number of > elements processed: > > public class TestAggregator { > > public static void main(String[] args) throws > AggregatorRetrievalException { > > class MyDoFn extends DoFn<Integer, Integer> { > > Aggregator<Long, Long> agg = createAggregator("numRecords", new > Sum.SumLongFn()); > > @Override > public void processElement(ProcessContext c) throws Exception { > agg.addValue(1L); > } > } > > FlinkPipelineOptions options = > PipelineOptionsFactory.as(FlinkPipelineOptions.class); > options.setRunner(FlinkPipelineRunner.class); > options.setStreaming(true); > > Pipeline pipeline = Pipeline.create(options); > > MyDoFn myDoFn = new MyDoFn(); > pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(myDoFn)); > > PipelineResult result = pipeline.run(); > > System.out.println("Result: " + > result.getAggregatorValues(myDoFn.agg).getValues()); > > } > } > > As expected, this prints [3]. > > > Cheers, > Max > > On Mon, Apr 25, 2016 at 7:25 PM, kaniska Mandal > <[email protected]> wrote: > > Whats the recommended approach > > > >> to reliably shut down the pipeline > > > >> to run the beam-flink pipeline in a scheduled manner > > > >> tomonitor the rates/throughputs/throttling/multiple threads spawned - > by > >> Pipeline , any suggestion ? > > > > Thanks > > Kaniska >
