[
https://issues.apache.org/jira/browse/APEXCORE-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938145#comment-15938145
]
Tushar Gosavi commented on APEXCORE-649:
----------------------------------------
# Example use case
- One common use case is pushing application metrics to third party monitoring
systems such as Graphite, OpenTSDB, etc ..., see the `Example Plugin` below how
this can be achieved.
- Taking decisions based on stats and events, such as kill the application in
case of container failure, which can be done by monitoring an container killed
event and then killing the application.
# Writing an Plugin
User can define a Apex Plugin by extending from DAGExecutionPlugin. The
important methods in the class as as below
- **setup(DAGExecutionPluginContext)**
In the setup user can register to the interested events the following events
are supported.
- *ContainerHeartbeat* - The heartbeat from StreamingContainer as passed to
the plugin for examination after it has been handled by the application master.
{code:language=java}
context.register(HEARTBEAT, new Handler<>(ContainerHeartbeat chb) {
...
});
{code}
- *StramEvent* - All Stram events generated by platform can be monitored
the platform.
{code:language=java}
context.register(STRAM_EVENT, new Handler<>(StreamEvent event) {
...
});
{code}
- *Committed* - When committed windowId is changed the plugin is notified
so that plugin can cleanup cached data if required.
{code:language=java}
context.register(COMMIT_EVENT, new Handler<>(Long wid) {
...
});
{code}
- **teardown()**
clean additional resources created by the plugin.
The all the handlers should be thread-safe as there is no guarantee that plugin
execution environment provides for thread safety. Also the plugins should not
block for long time as it could prevent other plugins from
executing and may result in dropped events in case of full queue.
# Loading of plugin
The plugins are loaded by platform in application master. currently the plugins
are searched by two methods.
1. Through JavaServiceLoader functionality.
In this case user can create a jar of his plugin with
META-INF/services/org.apache.apex.engine.api.DAGExecutionPlugin file in
resource directory. This file should contain the fully classified name
of the class of the plugin. (See
https://docs.oracle.com/javase/tutorial/ext/basics/spi.html)
2. Providing class names of Plugins through application configuration file.
Alternatively user can provide fully qualified name of the class
implementing plugin in application configuration file as given below.
{code:language=java}
<property>
<name>apex.plugin.stram.plugins</name>
<value>{fcn of plugin}</value>
</property>
{code}
## Example Plugin.
A sample plugin to push the container free memory metric to Grapite monitoring
system is given below.
{code:language=java}
package com.tugo.apex.plugins;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.apache.apex.engine.api.DAGExecutionPlugin;
import org.apache.apex.engine.api.DAGExecutionPluginContext;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import static org.slf4j.LoggerFactory.getLogger;
public class GraphitePushPlugin implements DAGExecutionPlugin
{
private static final Logger LOG = getLogger(GraphitePushPlugin.class);
private DAGExecutionPluginContext context;
ScheduledExecutorService executorService;
private String appName;
private String host;
private int port;
private Socket socket;
private OutputStream output;
private boolean connected = false;
@Override
public void setup(DAGExecutionPluginContext context)
{
executorService = Executors.newSingleThreadScheduledExecutor();
context.register(DAGExecutionPluginContext.HEARTBEAT, new
DAGExecutionPluginContext.Handler<StreamingContainerUmbilicalProtocol.ContainerHeartbeat>()
{
@Override
public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat
heartbeat)
{
handleHeartbeat(heartbeat);
}
});
appName = context.getApplicationContext().getApplicationName();
host = context.getLaunchConfig().get("graphite-host");
port = Integer.parseInt(context.getLaunchConfig().get("graphite-port"));
}
synchronized void connect() throws IOException
{
if (!connected) {
socket = new Socket(host, port);
output = socket.getOutputStream();
connected = true;
}
}
private void
handleHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat
heartbeat)
{
StringBuilder builder = new StringBuilder(1024);
builder.append(appName).append(".").append(heartbeat.getContainerId()).append(".").append("freeMemory").append("
")
.append(heartbeat.memoryMBFree).append(" ").append(heartbeat.sentTms /
1000).append("\n");
try {
connect();
if (output != null) {
output.write(builder.toString().getBytes());
}
} catch (IOException e) {
connected = false;
output = null;
socket = null;
}
}
@Override
public void teardown()
{
if (socket != null) {
try {
if (output != null) {
output.flush();
}
socket.close();
} catch (IOException e) {
LOG.warn("error while closing the socket");
}
}
}
}
{code}
> Infrastructure for user define stram event listeners.
> -----------------------------------------------------
>
> Key: APEXCORE-649
> URL: https://issues.apache.org/jira/browse/APEXCORE-649
> Project: Apache Apex Core
> Issue Type: Sub-task
> Reporter: Tushar Gosavi
> Assignee: Tushar Gosavi
>
> As suggested while working on Visitor API, I have came up with following
> proposal. The idea is to support user defined DAG listeners. The plan is to
> support limitated set of events for now and we could add more events
> in future.
> For the details functionality propvided check attached document.
> Please provide feedback on provided proposal.
> https://docs.google.com/document/d/1SAIE0EjnCumrB1jKJSnbGvcml47Po8ZHFthfcbNJQgU/edit?usp=sharing
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)