Hi Kaniska,

Pleas see my answers below.

>  Is it a feasible idea to maintain any Load Balancer in front of the 
> 'Beam-Flink Pipeline Executor Process' in order to control rate-limits / 
> throttling ?

No, that's currently not feasible/possible. Flink doesn't need
rate-limits due to its back pressure mechanism.

> Should we maintain multiple containers of 'Beam-Flink Pipelines' or a single 
> container with multiple instances of Beam-Flink Pipeline ?

You can run multiple Beam pipelines per Flink cluster instance. If you
run Yarn, then deploying a cluster for each pipeline is also easy to
do.

> Is it a good idea to use any external 'Process Flow Controller' like Apache 
> NiFi to wire the Beam Pipelines and launch / halt / resume them 
> programmatically / interactively ?

As far as I know the flow controler in NiFi would need to have an
integration with Flink which doesn't exist yet. I don't think it is
necessary but it depends on your use case.

> So is it good enough to add a shutdownhook on Beam Pipeline and close 
> resources like KafkaProducer when Pipeline Job is killed due to Runtime 
> Exception ?  It would have been better if a custom exception was thrown , so 
> that the Job could gracefully handle it !

Resources like the KafkaProducer are automatically closed by Flink
upon shutdown of the job. No need to setup your own shutdown hook. You
get the underlying exception of the RuntimeException using
exception.getCause().

> I understand we can use monitd to restart a process, but any suggestion to 
> implement external 'Beam Pipeline Monitoring Agent'  to auto-retry to restart 
> the Pipeline ?

If you enable execution retries in the FlinkPipelineOptions, the
pipeline will automatically restart upon failure. (see
setNumberOfExecutionRetries(retries)). You may also use the Rest
interface to implement your own monitoring / restart mechanism.

- Max

On Tue, Apr 26, 2016 at 8:12 AM, kaniska Mandal
<[email protected]> wrote:
> Hi Max,
>
> Many thanks for the great explanations.
>
> Few questions regarding 'Execution Strategy of Group of similar / disparate
> Beam Pipelines'
>
>>  Is it a feasible idea to maintain any Load Balancer in front of the
>> 'Beam-Flink Pipeline Executor Process' in order to control rate-limits /
>> throttling ?
>
>> Should we maintain multiple containers of 'Beam-Flink Pipelines' or a
>> single container with multiple instances of Beam-Flink Pipeline ?
>
>> Is it a good idea to use any external 'Process Flow Controller' like
>> Apache NiFi to wire the Beam Pipelines and launch / halt / resume them
>> programmatically / interactively ?
>
> More questions related to graceful shutdown and restart
>
> Currently Flink Pipeline Runner#run() throws Runtime Exception.
>
>> So is it good enough to add a shutdownhook on Beam Pipeline and close
>> resources like KafkaProducer when Pipeline Job is killed due to Runtime
>> Exception ?  It would have been better if a custom exception was thrown , so
>> that the Job could gracefully handle it !
>
> BTW, I tried calling close() on FlinkKafkaProucer form inside Beam-Flink
> Pipeline Runner , but the producer didn't stop.
>
>> I understand we can use monitd to restart a process, but any suggestion to
>> implement external 'Beam Pipeline Monitoring Agent'  to auto-retry to
>> restart the Pipeline ?
>
> Let me know if any of the above points sound logical, then I'll go ahead and
> create Feature request.
>
> Thanks,
> Kaniska
>
> On Mon, Apr 25, 2016 at 11:12 AM, Maximilian Michels <[email protected]> wrote:
>>
>> Hi Kaniska,
>>
>> Not all of these are uniform across all Runners yet but since you have
>> previously deployed applications with the Flink Runner, here are my
>> answers from a Flink perspective.
>>
>> ** Shutdown **
>>
>> For shutting down a Flink pipeline, you can use the "cancel" function:
>>
>> >./bin/flink cancel <jobId>
>>
>> When you submit your job in detached mode, e.g., ./bin/flink run -d
>> /path/to/jar you get a job id in return which you can use for the
>> cancel command. Alternatively, query the running jobs via
>>
>> >./bin/flink list
>>
>> Very soon we will have checkpointing of sources/sinks in Beam. That
>> would enable you to use Flink's Savepoint feature. Savepoints allow
>> you to take a snapshot of your Flink job at a moment and time,
>> shutdown your application, and resume execution at that snapshot later
>> in time. This works for Flink but not yet for Beam programs.
>>
>> ** Scheduling **
>>
>> You'll have to setup a cron job or an external scheduling program to
>> run a Flink job at a specified time. There is no built-in pipeline
>> scheduling in Flink.
>>
>> ** Monitoring **
>>
>> Flink has a nice web interface available on port 8081 of the job
>> manager (master) node. It contains statistics like the number of
>> records read/written per operator and JVM metrics.
>>
>> You may also register "Accumulators" which enable you to provide your
>> own metrics. In the Beam API these are called "Aggregators".
>> Aggregators get translated to Flink accumulators. For instance, you
>> can have an aggregator that counts the number of records written by a
>> particular operator.
>>
>> You can see these metrics on the web interface or access them via the
>> Flink Rest API:
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html
>>
>> Here is an example of an aggregator in Beam which counts the number of
>> elements processed:
>>
>> public class TestAggregator {
>>
>>   public static void main(String[] args) throws
>> AggregatorRetrievalException {
>>
>>     class MyDoFn extends DoFn<Integer, Integer> {
>>
>>       Aggregator<Long, Long> agg = createAggregator("numRecords", new
>> Sum.SumLongFn());
>>
>>       @Override
>>       public void processElement(ProcessContext c) throws Exception {
>>         agg.addValue(1L);
>>       }
>>     }
>>
>>     FlinkPipelineOptions options =
>> PipelineOptionsFactory.as(FlinkPipelineOptions.class);
>>     options.setRunner(FlinkPipelineRunner.class);
>>     options.setStreaming(true);
>>
>>     Pipeline pipeline = Pipeline.create(options);
>>
>>     MyDoFn myDoFn = new MyDoFn();
>>     pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(myDoFn));
>>
>>     PipelineResult result = pipeline.run();
>>
>>     System.out.println("Result: " +
>> result.getAggregatorValues(myDoFn.agg).getValues());
>>
>>   }
>> }
>>
>> As expected, this prints [3].
>>
>>
>> Cheers,
>> Max
>>
>> On Mon, Apr 25, 2016 at 7:25 PM, kaniska Mandal
>> <[email protected]> wrote:
>> > Whats the recommended approach
>> >
>> >> to reliably shut down the pipeline
>> >
>> >> to run the beam-flink pipeline in a scheduled manner
>> >
>> >> tomonitor the rates/throughputs/throttling/multiple threads spawned  -
>> >> by
>> >> Pipeline , any suggestion ?
>> >
>> > Thanks
>> > Kaniska
>
>

Reply via email to