http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-ui-authentication.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-ui-authentication.md b/docs/contents/deployment/deployment-ui-authentication.md new file mode 100644 index 0000000..0990192 --- /dev/null +++ b/docs/contents/deployment/deployment-ui-authentication.md @@ -0,0 +1,290 @@ +## What is this about? + +## How to enable UI authentication? + +1. Change config file gear.conf, find entry `gearpump-ui.gearpump.ui-security.authentication-enabled`, change the value to true + + :::bash + gearpump-ui.gearpump.ui-security.authentication-enabled = true + + + Restart the UI dashboard, and then the UI authentication is enabled. It will prompt for user name and password. + +## How many authentication methods Gearpump UI server support? + +Currently, It supports: + +1. Username-Password based authentication and +2. OAuth2 based authentication. + +User-Password based authentication is enabled when `gearpump-ui.gearpump.ui-security.authentication-enabled`, + and **CANNOT** be disabled. + +UI server admin can also choose to enable **auxiliary** OAuth2 authentication channel. + +## User-Password based authentication + + User-Password based authentication covers all authentication scenarios which requires + user to enter an explicit username and password. + + Gearpump provides a built-in ConfigFileBasedAuthenticator which verify user name and password + against password hashcode stored in config files. + + However, developer can choose to extends the `org.apache.gearpump.security.Authenticator` to provide a custom + User-Password based authenticator, to support LDAP, Kerberos, and Database-based authentication... + +### ConfigFileBasedAuthenticator: built-in User-Password Authenticator + +ConfigFileBasedAuthenticator store all user name and password hashcode in configuration file gear.conf. Here +is the steps to configure ConfigFileBasedAuthenticator. + +#### How to add or remove user? + +For the default authentication plugin, it has three categories of users: admins, users, and guests. + +* admins: have unlimited permission, like shutdown a cluster, add/remove machines. +* users: have limited permission to submit an application and etc.. +* guests: can not submit/kill applications, but can view the application status. + +System administrator can add or remove user by updating config file `conf/gear.conf`. + +Suppose we want to add user jerry as an administrator, here are the steps: + +1. Pick a password, and generate the digest for this password. Suppose we use password `ilovegearpump`, + to generate the digest: + + :::bash + bin/gear org.apache.gearpump.security.PasswordUtil -password ilovegearpump + + + It will generate a digest value like this: + + :::bash + CgGxGOxlU8ggNdOXejCeLxy+isrCv0TrS37HwA== + + +2. Change config file conf/gear.conf at path `gearpump-ui.gearpump.ui-security.config-file-based-authenticator.admins`, + add user `jerry` in this list: + + :::bash + admins = { + ## Default Admin. Username: admin, password: admin + ## !!! Please replace this builtin account for production cluster for security reason. !!! + "admin" = "AeGxGOxlU8QENdOXejCeLxy+isrCv0TrS37HwA==" + "jerry" = "CgGxGOxlU8ggNdOXejCeLxy+isrCv0TrS37HwA==" + } + + +3. Restart the UI dashboard by `bin/services` to make the change effective. + +4. Group "admins" have very unlimited permission, you may want to restrict the permission. In that case + you can modify `gearpump-ui.gearpump.ui-security.config-file-based-authenticator.users` or + `gearpump-ui.gearpump.ui-security.config-file-based-authenticator.guests`. + +5. See description at `conf/gear.conf` to find more information. + +#### What is the default user and password? + +For ConfigFileBasedAuthenticator, Gearpump distribution is shipped with two default users: + +1. username: admin, password: admin +2. username: guest, password: guest + +User `admin` has unlimited permissions, while `guest` can only view the application status. + +For security reason, you need to remove the default users `admin` and `guest` for cluster in production. + +#### Is this secure? + +Firstly, we will NOT store any user password in any way so only the user himself knows the password. +We will use one-way hash digest to verify the user input password. + +### How to develop a custom User-Password Authenticator for LDAP, Database, and etc.. + +If developer choose to define his/her own User-Password based authenticator, it is required that user + modify configuration option: + + :::bash + ## Replace "org.apache.gearpump.security.CustomAuthenticator" with your real authenticator class. + gearpump.ui-security.authenticator = "org.apache.gearpump.security.CustomAuthenticator" + + +Make sure CustomAuthenticator extends interface: + + :::scala + trait Authenticator { + + def authenticate(user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult] + } + + +## OAuth2 based authentication + +OAuth2 based authentication is commonly use to achieve social login with social network account. + +Gearpump provides generic OAuth2 Authentication support which allow user to extend to support new authentication sources. + +Basically, OAuth2 based Authentication contains these steps: + 1. User accesses Gearpump UI website, and choose to login with OAuth2 server. + 2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint. + 3. End user complete the authorization in the domain of OAuth2 server. + 4. OAuth2 server redirects user back to Gearpump UI server. + 5. Gearpump UI server verify the tokens and extract credentials from query + parameters and form fields. + +### Terminologies + +For terms like client Id, and client secret, please refers to guide [RFC 6749](https://tools.ietf.org/html/rfc6749) + +### Enable web proxy for UI server + +To enable OAuth2 authentication, the Gearpump UI server should have network access to OAuth2 server, as + some requests are initiated directly inside Gearpump UI server. So, if you are behind a firewall, make + sure you have configured the proxy properly for UI server. + +#### If you are on Windows + + :::bash + set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com -Dhttps.proxyPort=8088 + bin/services + + +#### If you are on Linux + + :::bash + export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com -Dhttps.proxyPort=8088" + bin/services + + +### Google Plus OAuth2 Authenticator + +Google Plus OAuth2 Authenticator does authentication with Google OAuth2 service. It extracts the email address +from Google user profile as credentials. + +To use Google OAuth2 Authenticator, there are several steps: + +1. Register your application (Gearpump UI server here) as an application to Google developer console. +2. Configure the Google OAuth2 information in gear.conf +3. Configure network proxy for Gearpump UI server if applies. + +#### Step1: Register your website as an OAuth2 Application on Google + +1. Create an application representing your website at [https://console.developers.google.com](https://console.developers.google.com) +2. In "API Manager" of your created application, enable API "Google+ API" +3. Create OAuth client ID for this application. In "Credentials" tab of "API Manager", +choose "Create credentials", and then select OAuth client ID. Follow the wizard +to set callback URL, and generate client ID, and client Secret. + +**NOTE:** Callback URL is NOT optional. + +#### Step2: Configure the OAuth2 information in gear.conf + +1. Enable OAuth2 authentication by setting `gearpump.ui-security.oauth2-authenticator-enabled` +as true. +2. Configure section `gearpump.ui-security.oauth2-authenticators.google` in gear.conf. Please make sure +class name, client ID, client Secret, and callback URL are set properly. + +**NOTE:** Callback URL set here should match what is configured on Google in step1. + +#### Step3: Configure the network proxy if applies. + +To enable OAuth2 authentication, the Gearpump UI server should have network access to Google service, as + some requests are initiated directly inside Gearpump UI server. So, if you are behind a firewall, make + sure you have configured the proxy properly for UI server. + +For guide of how to configure web proxy for UI server, please refer to section "Enable web proxy for UI server" above. + +#### Step4: Restart the UI server and try to click the Google login icon on UI server. + +### CloudFoundry UAA server OAuth2 Authenticator + +CloudFoundryUaaAuthenticator does authentication by using CloudFoundry UAA OAuth2 service. It extracts the email address + from Google user profile as credentials. + +For what is UAA (User Account and Authentication Service), please see guide: [UAA](https://github.com/cloudfoundry/uaa) + +To use Google OAuth2 Authenticator, there are several steps: + +1. Register your application (Gearpump UI server here) as an application to UAA with helper tool `uaac`. +2. Configure the Google OAuth2 information in gear.conf +3. Configure network proxy for Gearpump UI server if applies. + +#### Step1: Register your application to UAA with `uaac` + +1. Check tutorial on uaac at [https://docs.cloudfoundry.org/adminguide/uaa-user-management.html](https://docs.cloudfoundry.org/adminguide/uaa-user-management.html) +2. Open a bash shell, set the UAA server by command `uaac target` + + :::bash + uaac target [your uaa server url] + + +3. Login in as user admin by + + :::bash + uaac token client get admin -s MyAdminPassword + + +4. Create a new Application (Client) in UAA, + + :::bash + uaac client add [your_client_id] + --scope "openid cloud_controller.read" + --authorized_grant_types "authorization_code client_credentials refresh_token" + --authorities "openid cloud_controller.read" + --redirect_uri [your_redirect_url] + --autoapprove true + --secret [your_client_secret] + + +#### Step2: Configure the OAuth2 information in gear.conf + +1. Enable OAuth2 authentication by setting `gearpump.ui-security.oauth2-authenticator-enabled` as true. +2. Navigate to section `gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa` +3. Config gear.conf `gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa` section. +Please make sure class name, client ID, client Secret, and callback URL are set properly. + +**NOTE:** The callback URL here should match what you set on CloudFoundry UAA in step1. + +#### Step3: Configure network proxy for Gearpump UI server if applies + +To enable OAuth2 authentication, the Gearpump UI server should have network access to Google service, as + some requests are initiated directly inside Gearpump UI server. So, if you are behind a firewall, make + sure you have configured the proxy properly for UI server. + +For guide of how to configure web proxy for UI server, please refer to please refer to section "Enable web proxy for UI server" above. + +#### Step4: Restart the UI server and try to click the CloudFoundry login icon on UI server. + +#### Step5: You can also enable additional authenticator for CloudFoundry UAA by setting config: + + :::bash + additional-authenticator-enabled = true + + +Please see description in gear.conf for more information. + +#### Extends OAuth2Authenticator to support new Authorization service like Facebook, or Twitter. + +You can follow the Google OAuth2 example code to define a custom OAuth2Authenticator. Basically, the steps includes: + +1. Define an OAuth2Authenticator implementation. + +2. Add an configuration entry under `gearpump.ui-security.oauth2-authenticators`. For example: + + ## name of this authenticator + "socialnetworkx" { + "class" = "org.apache.gearpump.services.security.oauth2.impl.SocialNetworkXAuthenticator" + + ## Please make sure this URL matches the name + "callback" = "http://127.0.0.1:8090/login/oauth2/socialnetworkx/callback" + + "clientId" = "gearpump_test2" + "clientSecret" = "gearpump_test2" + "defaultUserRole" = "guest" + + ## Make sure socialnetworkx.png exists under dashboard/icons + "icon" = "/icons/socialnetworkx.png" + } + + + The configuration entry is supposed to be used by class `SocialNetworkXAuthenticator`. \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/deployment-yarn.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/deployment-yarn.md b/docs/contents/deployment/deployment-yarn.md new file mode 100644 index 0000000..401fa46 --- /dev/null +++ b/docs/contents/deployment/deployment-yarn.md @@ -0,0 +1,135 @@ +## How to launch a Gearpump cluster on YARN + +1. Upload the `gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip` to remote HDFS Folder, suggest to put it under `/usr/lib/gearpump/gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip` + +2. Make sure the home directory on HDFS is already created and all read-write rights are granted for user. For example, user gear's home directory is `/user/gear` + +3. Put the YARN configurations under classpath. + Before calling `yarnclient launch`, make sure you have put all yarn configuration files under classpath. Typically, you can just copy all files under `$HADOOP_HOME/etc/hadoop` from one of the YARN Cluster machine to `conf/yarnconf` of gearpump. `$HADOOP_HOME` points to the Hadoop installation directory. + +4. Launch the gearpump cluster on YARN + + :::bash + yarnclient launch -package /usr/lib/gearpump/gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip + + + If you don't specify package path, it will read default package-path (`gearpump.yarn.client.package-path`) from `gear.conf`. + + **NOTE:** You may need to execute `chmod +x bin/*` in shell to make the script file `yarnclient` executable. + +5. After launching, you can browser the Gearpump UI via YARN resource manager dashboard. + +## How to configure the resource limitation of Gearpump cluster + +Before launching a Gearpump cluster, please change configuration section `gearpump.yarn` in `gear.conf` to configure the resource limitation, like: + +1. The number of worker containers. +2. The YARN container memory size for worker and master. + +## How to submit a application to Gearpump cluster. + +To submit the jar to the Gearpump cluster, we first need to know the Master address, so we need to get +a active configuration file first. + +There are two ways to get an active configuration file: + +1. Option 1: specify "-output" option when you launch the cluster. + + :::bash + yarnclient launch -package /usr/lib/gearpump/gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip -output /tmp/mycluster.conf + + + It will return in console like this: + + :::bash + ==Application Id: application_1449802454214_0034 + + + +2. Option 2: Query the active configuration file + + :::bash + yarnclient getconfig -appid <yarn application id> -output /tmp/mycluster.conf + + + yarn application id can be found from the output of step1 or from YARN dashboard. + +3. After you downloaded the configuration file, you can launch application with that config file. + + :::bash + gear app -jar examples/wordcount-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.jar -conf /tmp/mycluster.conf + + +4. To run Storm application over Gearpump on YARN, please store the configuration file with `-output application.conf` + and then launch Storm application with + + :::bash + storm -jar examples/storm-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.jar storm.starter.ExclamationTopology exclamation + + +5. Now the application is running. To check this: + + :::bash + gear info -conf /tmp/mycluster.conf + + +6. To Start a UI server, please do: + + :::bash + services -conf /tmp/mycluster.conf + + + The default username and password is "admin:admin", you can check [UI Authentication](deployment-ui-authentication) to find how to manage users. + + +## How to add/remove machines dynamically. + +Gearpump yarn tool allows to dynamically add/remove machines. Here is the steps: + +1. First, query to get active resources. + + :::bash + yarnclient query -appid <yarn application id> + + + The console output will shows how many workers and masters there are. For example, I have output like this: + + :::bash + masters: + container_1449802454214_0034_01_000002(IDHV22-01:35712) + workers: + container_1449802454214_0034_01_000003(IDHV22-01:35712) + container_1449802454214_0034_01_000006(IDHV22-01:35712) + + +2. To add a new worker machine, you can do: + + :::bash + yarnclient addworker -appid <yarn application id> -count 2 + + + This will add two new workers machines. Run the command in first step to check whether the change is effective. + +3. To remove old machines, use: + + :::bash + yarnclient removeworker -appid <yarn application id> -container <worker container id> + + + The worker container id can be found from the output of step 1. For example "container_1449802454214_0034_01_000006" is a good container id. + +## Other usage: + +1. To kill a cluster, + + :::bash + yarnclient kill -appid <yarn application id> + + + **NOTE:** If the application is not launched successfully, then this command won't work. Please use "yarn application -kill <appId>" instead. + +2. To check the Gearpump version + + :::bash + yarnclient version -appid <yarn application id> + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/get-gearpump-distribution.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/get-gearpump-distribution.md b/docs/contents/deployment/get-gearpump-distribution.md new file mode 100644 index 0000000..8a31113 --- /dev/null +++ b/docs/contents/deployment/get-gearpump-distribution.md @@ -0,0 +1,83 @@ +### Prepare the binary +You can either download pre-build release package or choose to build from source code. + +#### Download Release Binary + +If you choose to use pre-build package, then you don't need to build from source code. The release package can be downloaded from: + +##### [Download page](http://gearpump.incubator.apache.org/downloads.html) + +#### Build from Source code + +If you choose to build the package from source code yourself, you can follow these steps: + +1). Clone the Gearpump repository + + :::bash + git clone https://github.com/apache/incubator-gearpump.git + cd gearpump + + +2). Build package + + :::bash + ## Please use scala 2.11 + ## The target package path: output/target/gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip + sbt clean assembly packArchiveZip + + + After the build, there will be a package file gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip generated under output/target/ folder. + + **NOTE:** + Please set JAVA_HOME environment before the build. + + On linux: + + :::bash + export JAVA_HOME={path/to/jdk/root/path} + + + On Windows: + + :::bash + set JAVA_HOME={path/to/jdk/root/path} + + + **NOTE:** +The build requires network connection. If you are behind an enterprise proxy, make sure you have set the proxy in your env before running the build commands. +For windows: + + :::bash + set HTTP_PROXY=http://host:port + set HTTPS_PROXY= http://host:port + + +For Linux: + + :::bash + export HTTP_PROXY=http://host:port + export HTTPS_PROXY= http://host:port + + +### Gearpump package structure + +You need to flatten the `.zip` file to use it. On Linux, you can + + :::bash + unzip gearpump-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}.zip + + +After decompression, the directory structure looks like picture 1. + + + +Under bin/ folder, there are script files for Linux(bash script) and Windows(.bat script). + +script | function +--------|------------ +local | You can start the Gearpump cluster in single JVM(local mode), or in a distributed cluster(cluster mode). To start the cluster in local mode, you can use the local /local.bat helper scripts, it is very useful for developing or troubleshooting. +master | To start Gearpump in cluster mode, you need to start one or more master nodes, which represent the global resource management center. master/master.bat is launcher script to boot the master node. +worker | To start Gearpump in cluster mode, you also need to start several workers, with each worker represent a set of local resources. worker/worker.bat is launcher script to start the worker node. +services | This script is used to start backend REST service and other services for frontend UI dashboard (Default user "admin, admin"). + +Please check [Command Line Syntax](../introduction/commandline) for more information for each script. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/deployment/hardware-requirement.md ---------------------------------------------------------------------- diff --git a/docs/contents/deployment/hardware-requirement.md b/docs/contents/deployment/hardware-requirement.md new file mode 100644 index 0000000..dfe765b --- /dev/null +++ b/docs/contents/deployment/hardware-requirement.md @@ -0,0 +1,30 @@ +### Pre-requisite + +Gearpump cluster can be installed on Windows OS and Linux. + +Before installation, you need to decide how many machines are used to run this cluster. + +For each machine, the requirements are listed in table below. + +**Table: Environment requirement on single machine** + +Resource | Requirements +------------ | --------------------------- +Memory | 2GB free memory is required to run the cluster. For any production system, 32GB memory is recommended. +Java | JRE 6 or above +User permission | Root permission is not required +Network Ethernet |(TCP/IP) +CPU | Nothing special +HDFS installation | Default is not required. You only need to install it when you want to store the application jars in HDFS. +Kafka installation | Default is not required. You need to install Kafka when you want the at-least once message delivery feature. Currently, the only supported data source for this feature is Kafka + +**Table: The default port used in Gearpump:** + +| usage | Port | Description | +------------ | ---------------|------------ + Dashboard UI | 8090 | Web UI. +Dashboard web socket service | 8091 | UI backend web socket service for long connection. +Master port | 3000 | Every other role like worker, appmaster, executor, user use this port to communicate with Master. + +You need to ensure that your firewall has not banned these ports to ensure Gearpump can work correctly. +And you can modify the port configuration. Check [Configuration](deployment-configuration) section for details. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/dev/dev-connectors.md ---------------------------------------------------------------------- diff --git a/docs/contents/dev/dev-connectors.md b/docs/contents/dev/dev-connectors.md new file mode 100644 index 0000000..01deb3e --- /dev/null +++ b/docs/contents/dev/dev-connectors.md @@ -0,0 +1,237 @@ +## Basic Concepts +`DataSource` and `DataSink` are the two main concepts Gearpump use to connect with the outside world. + +### DataSource +`DataSource` is the start point of a streaming processing flow. + + +### DataSink +`DataSink` is the end point of a streaming processing flow. + +## Implemented Connectors + +### `DataSource` implemented +Currently, we have following `DataSource` supported. + +Name | Description +-----| ---------- +`CollectionDataSource` | Convert a collection to a recursive data source. E.g. `seq(1, 2, 3)` will output `1,2,3,1,2,3...`. +`KafkaSource` | Read from Kafka. + +### `DataSink` implemented +Currently, we have following `DataSink` supported. + +Name | Description +-----| ---------- +`HBaseSink` | Write the message to HBase. The message to write must be HBase `Put` or a tuple of `(rowKey, family, column, value)`. +`KafkaSink` | Write to Kafka. + +## Use of Connectors + +### Use of Kafka connectors + +To use Kafka connectors in your application, you first need to add the `gearpump-external-kafka` library dependency in your application: + +#### SBT + + :::sbt + "org.apache.gearpump" %% "gearpump-external-kafka" % {{GEARPUMP_VERSION}} + +#### XML + + :::xml + <dependency> + <groupId>org.apache.gearpump</groupId> + <artifactId>gearpump-external-kafka</artifactId> + <version>{{GEARPUMP_VERSION}}</version> + </dependency> + + +This is a simple example to read from Kafka and write it back using `KafkaSource` and `KafkaSink`. Users can optionally set a `CheckpointStoreFactory` such that Kafka offsets are checkpointed and at-least-once message delivery is guaranteed. + +#### Low level API + + :::scala + val appConfig = UserConfig.empty + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + val source = new KafkaSource(sourceTopic, props) + val checkpointStoreFactory = new KafkaStoreFactory(props) + source.setCheckpointStore(checkpointStoreFactory) + val sourceProcessor = DataSourceProcessor(source, sourceNum) + val sink = new KafkaSink(sinkTopic, props) + val sinkProcessor = DataSinkProcessor(sink, sinkNum) + val partitioner = new ShufflePartitioner + val computation = sourceProcessor ~ partitioner ~> sinkProcessor + val app = StreamApplication(appName, Graph(computation), appConfig) + +#### High level API + + :::scala + val props = new Properties + val appName = "KafkaDSL" + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + + val app = StreamApp(appName, context) + + if (atLeastOnce) { + val checkpointStoreFactory = new KafkaStoreFactory(props) + KafkaDSL.createAtLeastOnceStream(app, sourceTopic, checkpointStoreFactory, props, sourceNum) + .writeToKafka(sinkTopic, props, sinkNum) + } else { + KafkaDSL.createAtMostOnceStream(app, sourceTopic, props, sourceNum) + .writeToKafka(sinkTopic, props, sinkNum) + } + + +In the above example, configurations are set through Java properties and shared by `KafkaSource`, `KafkaSink` and `KafkaCheckpointStoreFactory`. +Their configurations can be defined differently as below. + +#### `KafkaSource` configurations + +Name | Descriptions | Type | Default +---- | ------------ | ---- | ------- +`KafkaConfig.ZOOKEEPER_CONNECT_CONFIG` | Zookeeper connect string for Kafka topics management | String +`KafkaConfig.CLIENT_ID_CONFIG` | An id string to pass to the server when making requests | String | "" +`KafkaConfig.GROUP_ID_CONFIG` | A string that uniquely identifies a set of consumers within the same consumer group | "" +`KafkaConfig.FETCH_SLEEP_MS_CONFIG` | The amount of time(ms) to sleep when hitting fetch.threshold | Int | 100 +`KafkaConfig.FETCH_THRESHOLD_CONFIG` | Size of internal queue to keep Kafka messages. Stop fetching and go to sleep when hitting the threshold | Int | 10000 +`KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG` | Partition grouper class to group partitions among source tasks | Class | DefaultPartitionGrouper +`KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG` | Message decoder class to decode raw bytes from Kafka | Class | DefaultMessageDecoder +`KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG` | Timestamp filter class to filter out late messages | Class | DefaultTimeStampFilter + + +#### `KafkaSink` configurations + +Name | Descriptions | Type | Default +---- | ------------ | ---- | ------- +`KafkaConfig.BOOTSTRAP_SERVERS_CONFIG` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster | String | +`KafkaConfig.CLIENT_ID_CONFIG` | An id string to pass to the server when making requests | String | "" + +#### `KafkaCheckpointStoreFactory` configurations + +Name | Descriptions | Type | Default +---- | ------------ | ---- | ------- +`KafkaConfig.ZOOKEEPER_CONNECT_CONFIG` | Zookeeper connect string for Kafka topics management | String | +`KafkaConfig.BOOTSTRAP_SERVERS_CONFIG` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster | String | +`KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX` | Name prefix for checkpoint store | String | "" +`KafkaConfig.REPLICATION_FACTOR` | Replication factor for checkpoint store topic | Int | 1 + +### Use of `HBaseSink` + +To use `HBaseSink` in your application, you first need to add the `gearpump-external-hbase` library dependency in your application: + +#### SBT + + :::sbt + "org.apache.gearpump" %% "gearpump-external-hbase" % {{GEARPUMP_VERSION}} + +#### XML + :::xml + <dependency> + <groupId>org.apache.gearpump</groupId> + <artifactId>gearpump-external-hbase</artifactId> + <version>{{GEARPUMP_VERSION}}</version> + </dependency> + + +To connect to HBase, you need to provide following info: + + * the HBase configuration to tell which HBase service to connect + * the table name (you must create the table yourself, see the [HBase documentation](https://hbase.apache.org/book.html)) + +Then, you can use `HBaseSink` in your application: + + :::scala + //create the HBase data sink + val sink = HBaseSink(UserConfig.empty, tableName, HBaseConfiguration.create()) + + //create Gearpump Processor + val sinkProcessor = DataSinkProcessor(sink, parallelism) + + + :::scala + //assume stream is a normal `Stream` in DSL + stream.writeToHbase(UserConfig.empty, tableName, parallelism, "write to HBase") + + +You can tune the connection to HBase via the HBase configuration passed in. If not passed, Gearpump will try to check local classpath to find a valid HBase configuration (`hbase-site.xml`). + +Attention, due to the issue discussed [here](http://stackoverflow.com/questions/24456484/hbase-managed-zookeeper-suddenly-trying-to-connect-to-localhost-instead-of-zooke) you may need to create additional configuration for your HBase sink: + + :::scala + def hadoopConfig = { + val conf = new Configuration() + conf.set("hbase.zookeeper.quorum", "zookeeperHost") + conf.set("hbase.zookeeper.property.clientPort", "2181") + conf + } + val sink = HBaseSink(UserConfig.empty, tableName, hadoopConfig) + + +## How to implement your own `DataSource` + +To implement your own `DataSource`, you need to implement two things: + +1. The data source itself +2. a helper class to easy the usage in a DSL + +### Implement your own `DataSource` +You need to implement a class derived from `org.apache.gearpump.streaming.transaction.api.TimeReplayableSource`. + +### Implement DSL helper (Optional) +If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper. +You can refer `KafkaDSLUtil` as an example in Gearpump source. + +Below is some code snippet from `KafkaDSLUtil`: + + :::scala + object KafkaDSLUtil { + + def createStream[T]( + app: StreamApp, + topics: String, + parallelism: Int, + description: String, + properties: Properties): dsl.Stream[T] = { + app.source[T](new KafkaSource(topics, properties), parallelism, description) + } + } + + +## How to implement your own `DataSink` +To implement your own `DataSink`, you need to implement two things: + +1. The data sink itself +2. a helper class to make it easy use in DSL + +### Implement your own `DataSink` +You need to implement a class derived from `org.apache.gearpump.streaming.sink.DataSink`. + +### Implement DSL helper (Optional) +If you would like to have a DSL at hand you may start with this customized stream; it is better if you can implement your own DSL helper. +You can refer `HBaseDSLSink` as an example in Gearpump source. + +Below is some code snippet from `HBaseDSLSink`: + + :::scala + class HBaseDSLSink[T](stream: Stream[T]) { + def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String): Stream[T] = { + stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description) + } + + def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, parallism: Int, description: String): Stream[T] = { + stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description) + } + } + + object HBaseDSLSink { + implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = { + new HBaseDSLSink[T](stream) + } + } + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/dev/dev-custom-serializer.md ---------------------------------------------------------------------- diff --git a/docs/contents/dev/dev-custom-serializer.md b/docs/contents/dev/dev-custom-serializer.md new file mode 100644 index 0000000..b1abeda --- /dev/null +++ b/docs/contents/dev/dev-custom-serializer.md @@ -0,0 +1,137 @@ +Gearpump has a built-in serialization framework with a shaded Kryo version, which allows you to customize how a specific message type can be serialized. + +#### Register a class before serialization. + +Note, to use built-in kryo serialization framework, Gearpump requires all classes to be registered explicitly before using, no matter you want to use a custom serializer or not. If not using custom serializer, Gearpump will use default com.esotericsoftware.kryo.serializers.FieldSerializer to serialize the class. + +To register a class, you need to change the configuration file gear.conf(or application.conf if you want it only take effect for single application). + + :::json + gearpump { + serializers { + ## We will use default FieldSerializer to serialize this class type + "org.apache.gearpump.UserMessage" = "" + + ## we will use custom serializer to serialize this class type + "org.apache.gearpump.UserMessage2" = "org.apache.gearpump.UserMessageSerializer" + } + } + + +#### How to define a custom serializer for built-in kryo serialization framework + +When you decide that you want to define a custom serializer, you can do this in two ways. + +Please note that Gearpump shaded the original Kryo dependency. The package name ```com.esotericsoftware``` was relocated to ```org.apache.gearpump.esotericsoftware```. So in the following customization, you should import corresponding shaded classes, the example code will show that part. + +In general you should use the shaded version of a library whenever possible in order to avoid binary incompatibilities, eg don't use: + + :::scala + import com.google.common.io.Files + + +but rather + + :::scala + import org.apache.gearpump.google.common.io.Files + + +##### System Level Serializer + +If the serializer is widely used, you can define a global serializer which is available to all applications(or worker or master) in the system. + +###### Step1: you first need to develop a java library which contains the custom serializer class. here is an example: + + :::scala + package org.apache.gearpump + + import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer} + import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output} + + class UserMessage(longField: Long, intField: Int) + + class UserMessageSerializer extends Serializer[UserMessage] { + override def write(kryo: Kryo, output: Output, obj: UserMessage) = { + output.writeLong(obj.longField) + output.writeInt(obj.intField) + } + + override def read(kryo: Kryo, input: Input, typ: Class[UserMessage]): UserMessage = { + val longField = input.readLong() + val intField = input.readInt() + new UserMessage(longField, intField) + } + } + + +###### Step2: Distribute the libraries + +Distribute the jar file to lib/ folder of every Gearpump installation in the cluster. + +###### Step3: change gear.conf on every machine of the cluster: + + :::json + gearpump { + serializers { + "org.apache.gearpump.UserMessage" = "org.apache.gearpump.UserMessageSerializer" + } + } + + +###### All set! + +##### Define Application level custom serializer +If all you want is to define an application level serializer, which is only visible to current application AppMaster and Executors(including tasks), you can follow a different approach. + +###### Step1: Define your custom Serializer class + +You should include the Serializer class in your application jar. Here is an example to define a custom serializer: + + :::scala + package org.apache.gearpump + + import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer} + import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output} + + class UserMessage(longField: Long, intField: Int) + + class UserMessageSerializer extends Serializer[UserMessage] { + override def write(kryo: Kryo, output: Output, obj: UserMessage) = { + output.writeLong(obj.longField) + output.writeInt(obj.intField) + } + + override def read(kryo: Kryo, input: Input, typ: Class[UserMessage]): UserMessage = { + val longField = input.readLong() + val intField = input.readInt() + new UserMessage(longField, intField) + } + } + + +###### Step2: Put a application.conf in your classpath on Client machine where you submit the application, + + :::json + ### content of application.conf + gearpump { + serializers { + "org.apache.gearpump.UserMessage" = "org.apache.gearpump.UserMessageSerializer" + } + } + + +###### Step3: All set! + +#### Advanced: Choose another serialization framework + +Note: This is only for advanced user which require deep customization of Gearpump platform. + +There are other serialization framework besides Kryo, like Protobuf. If user don't want to use the built-in kryo serialization framework, he can customize a new serialization framework. + +basically, user need to define in gear.conf(or application.conf for single application's scope) file like this: + + :::bash + gearpump.serialization-framework = "org.apache.gearpump.serializer.CustomSerializationFramework" + + +Please find an example in gearpump storm module, search "StormSerializationFramework" in source code. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/dev/dev-ide-setup.md ---------------------------------------------------------------------- diff --git a/docs/contents/dev/dev-ide-setup.md b/docs/contents/dev/dev-ide-setup.md new file mode 100644 index 0000000..fa983ba --- /dev/null +++ b/docs/contents/dev/dev-ide-setup.md @@ -0,0 +1,29 @@ +### Intellij IDE Setup + +1. In Intellij, download scala plugin. We are using scala version {{SCALA_BINARY_VERSION}} +2. Open menu "File->Open" to open Gearpump root project, then choose the Gearpump source folder. +3. All set. + +**NOTE:** Intellij Scala plugin is already bundled with sbt. If you have Scala plugin installed, please don't install additional sbt plugin. Check your settings at "Settings -> Plugins" +**NOTE:** If you are behind a proxy, to speed up the build, please set the proxy for sbt in "Settings -> Build Tools > SBT". in input field "VM parameters", add + + :::bash + -Dhttp.proxyHost=<proxy host> + -Dhttp.proxyPort=<port like 911> + -Dhttps.proxyHost=<proxy host> + -Dhttps.proxyPort=<port like 911> + + +### Eclipse IDE Setup + +I will show how to do this in eclipse LUNA. + +There is a sbt-eclipse plugin to generate eclipse project files, but seems there are some bugs, and some manual fix is still required. Here is the steps that works for me: + +1. Install latest version eclipse luna +2. Install latest scala-IDE http://scala-ide.org/download/current.html I use update site address: http://download.scala-ide.org/sdk/lithium/e44/scala211/stable/site +3. Open a sbt shell under the root folder of Gearpump. enter "eclipse", then we get all eclipse project file generated. +4. Use eclipse import wizard. File->Import->Existing projects into Workspace, make sure to tick the option "Search for nested projects" +5. Then it may starts to complain about encoding error, like "IO error while decoding". You need to fix the eclipse default text encoding by changing configuration at "Window->Preference->General->Workspace->Text file encoding" to UTF-8. +6. Then the project gearpump-external-kafka may still cannot compile. The reason is that there is some dependencies missing in generated .classpath file by sbt-eclipse. We need to do some manual fix. Right click on project icon of gearpump-external-kafka in eclipse, then choose menu "Build Path->Configure Build Path". A window will popup. Under the tab "projects", click add, choose "gearpump-streaming" +7. All set. Now the project should compile OK in eclipse. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/dev/dev-non-streaming-example.md ---------------------------------------------------------------------- diff --git a/docs/contents/dev/dev-non-streaming-example.md b/docs/contents/dev/dev-non-streaming-example.md new file mode 100644 index 0000000..3d1d5e0 --- /dev/null +++ b/docs/contents/dev/dev-non-streaming-example.md @@ -0,0 +1,133 @@ +We'll use [Distributed Shell](https://github.com/apache/incubator-gearpump/blob/master/examples/distributedshell) as an example to illustrate how to do that. + +What Distributed Shell do is that user send a shell command to the cluster and the command will the executed on each node, then the result will be return to user. + +### Maven/Sbt Settings + +Repository and library dependencies can be found at [Maven Setting](http://gearpump.incubator.apache.org/downloads.html#maven-dependencies) + +### Define Executor Class + + :::scala + class ShellExecutor(executorContext: ExecutorContext, userConf : UserConfig) extends Actor{ + import executorContext._ + + override def receive: Receive = { + case ShellCommand(command, args) => + val process = Try(s"$command $args" !!) + val result = process match { + case Success(msg) => msg + case Failure(ex) => ex.getMessage + } + sender ! ShellCommandResult(executorId, result) + } + } + +So ShellExecutor just receive the ShellCommand and try to execute it and return the result to the sender, which is quite simple. + +### Define AppMaster Class +For a non-streaming application, you have to write your own AppMaster. + +Here is a typical user defined AppMaster, please note that some trivial codes are omitted. + + :::scala + class DistShellAppMaster(appContext : AppMasterContext, app : Application) extends ApplicationMaster { + protected var currentExecutorId = 0 + + override def preStart(): Unit = { + ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self) + } + + override def receive: Receive = { + case ExecutorSystemStarted(executorSystem) => + import executorSystem.{address, worker, resource => executorResource} + val executorContext = ExecutorContext(currentExecutorId, worker.workerId, appId, self, executorResource) + val executor = context.actorOf(Props(classOf[ShellExecutor], executorContext, app.userConfig) + .withDeploy(Deploy(scope = RemoteScope(address))), currentExecutorId.toString) + executorSystem.bindLifeCycleWith(executor) + currentExecutorId += 1 + case StartExecutorSystemTimeout => + masterProxy ! ShutdownApplication(appId) + context.stop(self) + case msg: ShellCommand => + Future.fold(context.children.map(_ ? msg))(new ShellCommandResultAggregator) { (aggregator, response) => + aggregator.aggregate(response.asInstanceOf[ShellCommandResult]) + }.map(_.toString()) pipeTo sender + } + + private def getExecutorJvmConfig: ExecutorSystemJvmConfig = { + val config: Config = Option(app.clusterConfig).map(_.getConfig).getOrElse(ConfigFactory.empty()) + val jvmSetting = Util.resolveJvmSetting(config.withFallback(context.system.settings.config)).executor + ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, + appJar, username, config) + } + } + + +So when this `DistShellAppMaster` started, first it will request resources to launch one executor on each node, which is done in method `preStart` + +Then the DistShellAppMaster's receive handler will handle the allocated resource to launch the `ShellExecutor` we want. If you want to write your application, you can just use this part of code. The only thing needed is replacing the Executor class. + +There may be a situation that the resource allocation failed which will bring the message `StartExecutorSystemTimeout`, the normal pattern to handle that is just what we do: shut down the application. + +The real application logic part is in `ShellCommand` message handler, which is specific to different applications. Here we distribute the shell command to each executor and aggregate the results to the client. + +For method `getExecutorJvmConfig`, you can just use this part of code in your own application. + +### Define Application +Now its time to launch the application. + + :::scala + object DistributedShell extends App with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + LOG.info(s"Distributed shell submitting application...") + val context = ClientContext() + val appId = context.submit(Application[DistShellAppMaster]("DistributedShell", UserConfig.empty)) + context.close() + LOG.info(s"Distributed Shell Application started with appId $appId !") + } + +The application class extends `App` and `ArgumentsParser which make it easier to parse arguments and run main functions. This part is similar to the streaming applications. + +The main class `DistributeShell` will submit an application to `Master`, whose `AppMaster` is `DistShellAppMaster`. + +### Define an optional Client class + +Now, we can define a `Client` class to talk with `AppMaster` to pass our commands to it. + + :::scala + object DistributedShellClient extends App with ArgumentsParser { + implicit val timeout = Constants.FUTURE_TIMEOUT + import scala.concurrent.ExecutionContext.Implicits.global + private val LOG: Logger = LoggerFactory.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "master" -> CLIOption[String]("<host1:port1,host2:port2,host3:port3>", required = true), + "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true), + "command" -> CLIOption[String]("<shell command>", required = true), + "args" -> CLIOption[String]("<shell arguments>", required = true) + ) + + val config = parse(args) + val context = ClientContext(config.getString("master")) + val appid = config.getInt("appid") + val command = config.getString("command") + val arguments = config.getString("args") + val appMaster = context.resolveAppID(appid) + (appMaster ? ShellCommand(command, arguments)).map { reslut => + LOG.info(s"Result: $reslut") + context.close() + } + } + + +In the `DistributedShellClient`, it will resolve the appid to the real appmaster(the application id will be printed when launching `DistributedShell`). + +Once we got the `AppMaster`, then we can send `ShellCommand` to it and wait for the result. + +### Submit application + +After all these, you need to package everything into a uber jar and submit the jar to Gearpump Cluster. Please check [Application submission tool](../introduction/commandline) to command line tool syntax. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/dev/dev-rest-api.md ---------------------------------------------------------------------- diff --git a/docs/contents/dev/dev-rest-api.md b/docs/contents/dev/dev-rest-api.md new file mode 100644 index 0000000..85c4706 --- /dev/null +++ b/docs/contents/dev/dev-rest-api.md @@ -0,0 +1,1083 @@ +## Authentication. + +For all REST API calls, We need authentication by default. If you don't want authentication, you can disable them. + +### How to disable Authentication +To disable Authentication, you can set `gearpump-ui.gearpump.ui-security.authentication-enabled = false` +in gear.conf, please check [UI Authentication](../deployment/deployment-ui-authentication) for details. + +### How to authenticate if Authentication is enabled. + +#### For User-Password based authentication + +If Authentication is enabled, then you need to login before calling REST API. + + :::bash + curl -X POST --data username=admin --data password=admin --cookie-jar outputAuthenticationCookie.txt http://127.0.0.1:8090/login + + +This will use default user "admin:admin" to login, and store the authentication cookie to file outputAuthenticationCookie.txt. + +In All subsequent Rest API calls, you need to add the authentication cookie. For example + + :::bash + curl --cookie outputAuthenticationCookie.txt http://127.0.0.1/api/v1.0/master + + +for more information, please check [UI Authentication](../deployment/deployment-ui-authentication). + +#### For OAuth2 based authentication + +For OAuth2 based authentication, it requires you to have an access token in place. + +Different OAuth2 service provider have different way to return an access token. + +**For Google**, you can refer to [OAuth Doc](https://developers.google.com/identity/protocols/OAuth2). + +**For CloudFoundry UAA**, you can use the uaac command to get the access token. + + :::bash + $ uaac target http://login.gearpump.gotapaas.eu/ + $ uaac token get <user_email_address> + + ### Find access token + $ uaac context + + [0]*[http://login.gearpump.gotapaas.eu] + + [0]*[<user_email_address>] + user_id: 34e33a79-42c6-479b-a8c1-8c471ff027fb + client_id: cf + token_type: bearer + access_token: eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI + expires_in: 599 + scope: password.write openid cloud_controller.write cloud_controller.read + jti: 74ea49e4-1001-4757-9f8d-a66e52a27557 + + +For more information on uaac, please check [UAAC guide](https://docs.cloudfoundry.org/adminguide/uaa-user-management.html) + +Now, we have the access token, then let's login to Gearpump UI server with this access token: + + :::bash + ## Please replace cloudfoundryuaa with actual OAuth2 service name you have configured in gear.conf + curl -X POST --data accesstoken=eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI --cookie-jar outputAuthenticationCookie.txt http://127.0.0.1:8090/login/oauth2/cloudfoundryuaa/accesstoken + + +This will use user `user_email_address` to login, and store the authentication cookie to file outputAuthenticationCookie.txt. + +In All subsequent Rest API calls, you need to add the authentication cookie. For example + + :::bash + curl --cookie outputAuthenticationCookie.txt http://127.0.0.1/api/v1.0/master + + +**NOTE:** You can default the default permission level for OAuth2 user. for more information, +please check [UI Authentication](../deployment/deployment-ui-authentication). + +## Query version + +### GET version + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/version + + +Sample Response: + + :::bash + {{GEARPUMP_VERSION}} + + +## Master Service + +### GET api/v1.0/master +Get information of masters + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/master + + +Sample Response: + + :::json + { + "masterDescription": { + "leader":{"host":"[email protected]","port":3000}, + "cluster":[{"host":"127.0.0.1","port":3000}] + "aliveFor": "642941", + "logFile": "/Users/foobar/gearpump/logs", + "jarStore": "jarstore/", + "masterStatus": "synced", + "homeDirectory": "/Users/foobar/gearpump" + } + } + + +### GET api/v1.0/master/applist +Query information of all applications + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/master/applist + + +Sample Response: + + :::json + { + "appMasters": [ + { + "status": "active", + "appId": 1, + "appName": "wordCount", + "appMasterPath": "akka.tcp://[email protected]:52212/user/daemon/appdaemon1/$c", + "workerPath": "akka.tcp://[email protected]:3000/user/Worker0", + "submissionTime": "1450758114766", + "startTime": "1450758117294", + "user": "lisa" + } + ] + } + +### GET api/v1.0/master/workerlist +Query information of all workers + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/master/workerlist + +Sample Response: + + :::json + [ + { + "workerId": "1", + "state": "active", + "actorPath": "akka.tcp://[email protected]:3000/user/Worker0", + "aliveFor": "431565", + "logFile": "logs/", + "executors": [ + { + "appId": 1, + "executorId": -1, + "slots": 1 + }, + { + "appId": 1, + "executorId": 0, + "slots": 1 + } + ], + "totalSlots": 1000, + "availableSlots": 998, + "homeDirectory": "/usr/lisa/gearpump/", + "jvmName": "11788@lisa" + }, + { + "workerId": "0", + "state": "active", + "actorPath": "akka.tcp://[email protected]:3000/user/Worker1", + "aliveFor": "431546", + "logFile": "logs/", + "executors": [ + { + "appId": 1, + "executorId": 1, + "slots": 1 + } + ], + "totalSlots": 1000, + "availableSlots": 999, + "homeDirectory": "/usr/lisa/gearpump/", + "jvmName": "11788@lisa" + } + ] + +### GET api/v1.0/master/config +Get the configuration of all masters + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/master/config + +Sample Response: + + :::json + { + "extensions": [ + "akka.contrib.datareplication.DataReplication$" + ] + "akka": { + "loglevel": "INFO" + "log-dead-letters": "off" + "log-dead-letters-during-shutdown": "off" + "actor": { + ## Master forms a akka cluster + "provider": "akka.cluster.ClusterActorRefProvider" + } + "cluster": { + "roles": ["master"] + "auto-down-unreachable-after": "15s" + } + "remote": { + "log-remote-lifecycle-events": "off" + } + } + } + + +### GET api/v1.0/master/metrics/<query_path>?readLatest=<true|false> +Get the master node metrics. + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/master/metrics/master?readLatest=true + +Sample Response: + + :::bash + { + "path" + : + "master", "metrics" + : + [{ + "time": "1450758725070", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.heap.used", "value": "59764272"} + }, { + "time": "1450758725070", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:thread.daemon.count", "value": "18"} + }, { + "time": "1450758725070", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "master:memory.total.committed", + "value": "210239488" + } + }, { + "time": "1450758725070", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.heap.max", "value": "880017408"} + }, { + "time": "1450758725070", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.total.max", "value": "997457920"} + }, { + "time": "1450758725070", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "master:memory.heap.committed", + "value": "179830784" + } + }, { + "time": "1450758725070", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:memory.total.used", "value": "89117352"} + }, { + "time": "1450758725070", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "master:thread.count", "value": "28"} + }] + } + + +### POST api/v1.0/master/submitapp +Submit a streaming job jar to Gearpump cluster. It functions like command line + + :::bash + gear app -jar xx.jar -conf yy.conf -executors 1 <command line arguments> + + +Required MIME type: "multipart/form-data" + +Required post form fields: + +1. field name "jar", job jar file. + +Optional post form fields: + +1. "configfile", configuration file, in UTF8 format. +2. "configstring", text body of configuration file, in UTF8 format. +3. "executorcount", The count of JVM process to start across the cluster for this application job +4. "args", command line arguments for this job jar. + +Example html: + + :::html + <form id="submitapp" action="http://127.0.0.1:8090/api/v1.0/master/submitapp" + method="POST" enctype="multipart/form-data"> + + Job Jar (*.jar) [Required]: <br/> + <input type="file" name="jar"/> <br/> <br/> + + Config file (*.conf) [Optional]: <br/> + <input type="file" name="configfile"/> <br/> <br/> + + Config String, Config File in string format. [Optional]: <br/> + <input type="text" name="configstring" value="a.b.c.d=1"/> <br/><br/> + + Executor count (integer, how many process to start for this streaming job) [Optional]: <br/> + <input type="text" name="executorcount" value="1"/> <br/><br/> + + Application arguments (String) [Optional]: <br/> + <input type="text" name="args" value=""/> <br/><br/> + + <input type="submit" value="Submit"/> + + </table> + + </form> + +### POST api/v1.0/master/submitstormapp +Submit a storm jar to Gearpump cluster. It functions like command line + + :::bash + storm app -jar xx.jar -conf yy.yaml <command line arguments> + +Required MIME type: "multipart/form-data" + +Required post form fields: + +1. field name "jar", job jar file. + +Optional post form fields: + +1. "configfile", .yaml configuration file, in UTF8 format. +2. "args", command line arguments for this job jar. + +Example html: + + :::html + <form id="submitstormapp" action="http://127.0.0.1:8090/api/v1.0/master/submitstormapp" + method="POST" enctype="multipart/form-data"> + + Job Jar (*.jar) [Required]: <br/> + <input type="file" name="jar"/> <br/> <br/> + + Config file (*.yaml) [Optional]: <br/> + <input type="file" name="configfile"/> <br/> <br/> + + Application arguments (String) [Optional]: <br/> + <input type="text" name="args" value=""/> <br/><br/> + + <input type="submit" value="Submit"/> + + </table> + + </form> + + +## Worker service + +### GET api/v1.0/worker/<workerId> +Query worker information. + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/worker/0 + +Sample Response: + + :::json + { + "workerId": "0", + "state": "active", + "actorPath": "akka.tcp://[email protected]:3000/user/Worker1", + "aliveFor": "831069", + "logFile": "logs/", + "executors": [ + { + "appId": 1, + "executorId": 1, + "slots": 1 + } + ], + "totalSlots": 1000, + "availableSlots": 999, + "homeDirectory": "/usr/lisa/gearpump/", + "jvmName": "11788@lisa" + } + +### GET api/v1.0/worker/<workerId>/config +Query worker config + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/worker/0/config + + +Sample Response: + + :::json + { + "extensions": [ + "akka.contrib.datareplication.DataReplication$" + ] + "akka": { + "loglevel": "INFO" + "log-dead-letters": "off" + "log-dead-letters-during-shutdown": "off" + "actor": { + ## Master forms a akka cluster + "provider": "akka.cluster.ClusterActorRefProvider" + } + "cluster": { + "roles": ["master"] + "auto-down-unreachable-after": "15s" + } + "remote": { + "log-remote-lifecycle-events": "off" + } + } + } + + +### GET api/v1.0/worker/<workerId>/metrics/<query_path>?readLatest=<true|false> +Get the worker node metrics. + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/worker/0/metrics/worker?readLatest=true + +Sample Response: + + :::json + { + "path" + : + "worker", "metrics" + : + [{ + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.total.used", + "value": "152931440" + } + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.daemon.count", "value": "18"} + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.heap.used", + "value": "123139640" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.total.max", + "value": "997457920" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.heap.committed", + "value": "179830784" + } + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.count", "value": "28"} + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.max", "value": "880017408"} + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.max", "value": "880017408"} + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.total.committed", + "value": "210239488" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.total.used", + "value": "152931440" + } + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.count", "value": "28"} + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.total.max", + "value": "997457920" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.heap.committed", + "value": "179830784" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.total.committed", + "value": "210239488" + } + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.daemon.count", "value": "18"} + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.heap.used", + "value": "123139640" + } + }] + } + + +## Supervisor Service + +Supervisor service allows user to add or remove a worker machine. + +### POST api/v1.0/supervisor/status +Query whether the supervisor service is enabled. If Supervisor service is disabled, you are not allowed to use API like addworker/removeworker. + +Example: + + :::bash + curl -X POST [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/supervisor/status + +Sample Response: + + :::json + {"enabled":true} + + +### GET api/v1.0/supervisor +Get the supervisor path + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/supervisor + + +Sample Response: + + :::json + {path: "supervisor actor path"} + +### POST api/v1.0/supervisor/addworker/<worker-count> +Add workerCount new workers in the cluster. It will use the low level resource scheduler like +YARN to start new containers and then boot Gearpump worker process. + +Example: + + :::bash + curl -X POST [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/supervisor/addworker/2 + + + +Sample Response: + + :::json + {success: true} + +### POST api/v1.0/supervisor/removeworker/<worker-id> +Remove single worker instance by specifying a worker Id. + +**NOTE:* Use with caution! + +**NOTE:** All executors JVMs under this worker JVM will also be destroyed. It will trigger failover for all +applications that have executor started under this worker. + +Example: + + :::bash + curl -X POST [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/supervisor/removeworker/3 + + +Sample Response: + + :::json + {success: true} + +## Application service + +### GET api/v1.0/appmaster/<appId>?detail=<true|false> +Query information of an specific application of Id appId + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/appmaster/1?detail=true + +Sample Response: + + :::json + { + "appId": 1, + "appName": "wordCount", + "processors": [ + [ + 0, + { + "id": 0, + "taskClass": "org.apache.gearpump.streaming.examples.wordcount.Split", + "parallelism": 1, + "description": "", + "taskConf": { + "_config": {} + }, + "life": { + "birth": "0", + "death": "9223372036854775807" + }, + "executors": [ + 1 + ], + "taskCount": [ + [ + 1, + { + "count": 1 + } + ] + ] + } + ], + [ + 1, + { + "id": 1, + "taskClass": "org.apache.gearpump.streaming.examples.wordcount.Sum", + "parallelism": 1, + "description": "", + "taskConf": { + "_config": {} + }, + "life": { + "birth": "0", + "death": "9223372036854775807" + }, + "executors": [ + 0 + ], + "taskCount": [ + [ + 0, + { + "count": 1 + } + ] + ] + } + ] + ], + "processorLevels": [ + [ + 0, + 0 + ], + [ + 1, + 1 + ] + ], + "dag": { + "vertexList": [ + 0, + 1 + ], + "edgeList": [ + [ + 0, + "org.apache.gearpump.partitioner.HashPartitioner", + 1 + ] + ] + }, + "actorPath": "akka.tcp://[email protected]:52212/user/daemon/appdaemon1/$c/appmaster", + "clock": "1450759382430", + "executors": [ + { + "executorId": 0, + "executor": "akka.tcp://[email protected]:52240/remote/akka.tcp/[email protected]:52212/user/daemon/appdaemon1/$c/appmaster/executors/0#-1554950276", + "workerId": "1", + "status": "active" + }, + { + "executorId": 1, + "executor": "akka.tcp://[email protected]:52241/remote/akka.tcp/[email protected]:52212/user/daemon/appdaemon1/$c/appmaster/executors/1#928082134", + "workerId": "0", + "status": "active" + }, + { + "executorId": -1, + "executor": "akka://app1-executor-1/user/daemon/appdaemon1/$c/appmaster", + "workerId": "1", + "status": "active" + } + ], + "startTime": "1450758117306", + "uptime": "1268472", + "user": "lisa", + "homeDirectory": "/usr/lisa/gearpump/", + "logFile": "logs/", + "historyMetricsConfig": { + "retainHistoryDataHours": 72, + "retainHistoryDataIntervalMs": 3600000, + "retainRecentDataSeconds": 300, + "retainRecentDataIntervalMs": 15000 + } + } + + +### DELETE api/v1.0/appmaster/<appId> +shutdown application appId + +### GET api/v1.0/appmaster/<appId>/stallingtasks +Query list of unhealthy tasks of an specific application of Id appId + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/appmaster/2/stallingtasks + + +Sample Response: + + :::json + { + "tasks": [ + { + "processorId": 0, + "index": 0 + } + ] + } + + +### GET api/v1.0/appmaster/<appId>/config +Query the configuration of specific application appId + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/appmaster/1/config + + +Sample Response: + + :::json + { + "gearpump" : { + "appmaster" : { + "extraClasspath" : "", + "vmargs" : "-server -Xms512M -Xmx1024M -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3" + }, + "cluster" : { + "masters" : [ + "127.0.0.1:3000" + ] + }, + "executor" : { + "extraClasspath" : "", + "vmargs" : "-server -Xms512M -Xmx1024M -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3" + }, + "jarstore" : { + "rootpath" : "jarstore/" + }, + "log" : { + "application" : { + "dir" : "logs" + }, + "daemon" : { + "dir" : "logs" + } + }, + "metrics" : { + "enabled" : true, + "graphite" : { + "host" : "127.0.0.1", + "port" : 2003 + }, + "logfile" : {}, + "report-interval-ms" : 15000, + "reporter" : "akka", + "retainHistoryData" : { + "hours" : 72, + "intervalMs" : 3600000 + }, + "retainRecentData" : { + "intervalMs" : 15000, + "seconds" : 300 + }, + "sample-rate" : 10 + }, + "netty" : { + "base-sleep-ms" : 100, + "buffer-size" : 5242880, + "flush-check-interval" : 10, + "max-retries" : 30, + "max-sleep-ms" : 1000, + "message-batch-size" : 262144 + }, + "netty-dispatcher" : "akka.actor.default-dispatcher", + "scheduling" : { + "scheduler-class" : "org.apache.gearpump.cluster.scheduler.PriorityScheduler" + }, + "serializers" : { + "[B" : "", + "[C" : "", + "[D" : "", + "[F" : "", + "[I" : "", + "[J" : "", + "[Ljava.lang.String;" : "", + "[S" : "", + "[Z" : "", + "org.apache.gearpump.Message" : "org.apache.gearpump.streaming.MessageSerializer", + "org.apache.gearpump.streaming.task.Ack" : "org.apache.gearpump.streaming.AckSerializer", + "org.apache.gearpump.streaming.task.AckRequest" : "org.apache.gearpump.streaming.AckRequestSerializer", + "org.apache.gearpump.streaming.task.LatencyProbe" : "org.apache.gearpump.streaming.LatencyProbeSerializer", + "org.apache.gearpump.streaming.task.TaskId" : "org.apache.gearpump.streaming.TaskIdSerializer", + "scala.Tuple1" : "", + "scala.Tuple2" : "", + "scala.Tuple3" : "", + "scala.Tuple4" : "", + "scala.Tuple5" : "", + "scala.Tuple6" : "", + "scala.collection.immutable.$colon$colon" : "", + "scala.collection.immutable.List" : "" + }, + "services" : { + # gear.conf: 112 + "host" : "127.0.0.1", + # gear.conf: 113 + "http" : 8090, + # gear.conf: 114 + "ws" : 8091 + }, + "task-dispatcher" : "akka.actor.pined-dispatcher", + "worker" : { + # reference.conf: 100 + # # How many slots each worker contains + "slots" : 100 + } + } + } + + +### GET api/v1.0/appmaster/<appId>/metrics/<query_path>?readLatest=<true|false>&aggregator=<aggregator_class> +Query metrics information of a specific application appId +Filter metrics with path metrics path + +aggregator points to a aggregator class, which will aggregate on the current metrics, and return a smaller set. + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/appmaster/1/metrics/app1?readLatest=true&aggregator=org.apache.gearpump.streaming.metrics.ProcessorAggregator + + +Sample Response: + + :::json + { + "path" + : + "worker", "metrics" + : + [{ + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.total.used", + "value": "152931440" + } + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.daemon.count", "value": "18"} + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.heap.used", + "value": "123139640" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.total.max", + "value": "997457920" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.heap.committed", + "value": "179830784" + } + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.count", "value": "28"} + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:memory.heap.max", "value": "880017408"} + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:memory.heap.max", "value": "880017408"} + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.total.committed", + "value": "210239488" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker0:memory.total.used", + "value": "152931440" + } + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker1:thread.count", "value": "28"} + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.total.max", + "value": "997457920" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.heap.committed", + "value": "179830784" + } + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.total.committed", + "value": "210239488" + } + }, { + "time": "1450759137860", + "value": {"$type": "org.apache.gearpump.metrics.Metrics.Gauge", "name": "worker0:thread.daemon.count", "value": "18"} + }, { + "time": "1450759137860", + "value": { + "$type": "org.apache.gearpump.metrics.Metrics.Gauge", + "name": "worker1:memory.heap.used", + "value": "123139640" + } + }] + } + + +### GET api/v1.0/appmaster/<appId>/errors +Get task error messages + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/appmaster/1/errors + + +Sample Response: + + :::json + {"time":"0","error":null} + + +### POST api/v1.0/appmaster/<appId>/restart +Restart the application + +## Executor Service + +### GET api/v1.0/appmaster/<appId>/executor/<executorid>/config +Get executor config + +Example: + + :::bash + curl http://127.0.0.1:8090/api/v1.0/appmaster/1/executor/1/config + + +Sample Response: + + :::json + { + "extensions": [ + "akka.contrib.datareplication.DataReplication$" + ] + "akka": { + "loglevel": "INFO" + "log-dead-letters": "off" + "log-dead-letters-during-shutdown": "off" + "actor": { + ## Master forms a akka cluster + "provider": "akka.cluster.ClusterActorRefProvider" + } + "cluster": { + "roles": ["master"] + "auto-down-unreachable-after": "15s" + } + "remote": { + "log-remote-lifecycle-events": "off" + } + } + } + + +### GET api/v1.0/appmaster/<appId>/executor/<executorid> +Get executor information. + +Example: + + :::bash + curl [--cookie outputAuthenticationCookie.txt] http://127.0.0.1:8090/api/v1.0/appmaster/1/executor/1 + + +Sample Response: + + :::json + { + "id": 1, + "workerId": "0", + "actorPath": "akka.tcp://[email protected]:52241/remote/akka.tcp/[email protected]:52212/user/daemon/appdaemon1/$c/appmaster/executors/1", + "logFile": "logs/", + "status": "active", + "taskCount": 1, + "tasks": [ + [ + 0, + [ + { + "processorId": 0, + "index": 0 + } + ] + ] + ], + "jvmName": "21304@lisa" + } + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/contents/dev/dev-storm.md ---------------------------------------------------------------------- diff --git a/docs/contents/dev/dev-storm.md b/docs/contents/dev/dev-storm.md new file mode 100644 index 0000000..e60b505 --- /dev/null +++ b/docs/contents/dev/dev-storm.md @@ -0,0 +1,214 @@ +Gearpump provides **binary compatibility** for Apache Storm applications. That is to say, users could easily grab an existing Storm jar and run it +on Gearpump. This documentation illustrates Gearpump's compatibility with Storm. + +## What Storm features are supported on Gearpump + +### Storm 0.9.x + +| Feature | Support | +| ------- | ------- | +| basic topology | yes | +| DRPC | yes | +| multi-lang | yes | +| storm-kafka | yes | +| Trident | no | + +### Storm 0.10.x + +| Feature | Support | +| ----------- | -------------| +| basic topology | yes | +| DRPC | yes | +| multi-lang | yes | +| storm-kafka | yes | +| storm-hdfs| yes | +| storm-hbase | yes | +| storm-hive | yes | +| storm-jdbc | yes | +| storm-redis | yes | +| flux | yes | +| storm-eventhubs | not verified | +| Trident | no | + +### At Least Once support + +With Ackers enabled, there are two kinds of At Least Once support in both Storm 0.9.x and Storm 0.10.x. + +1. spout will replay messages on message loss as long as spout is alive +2. If `KafkaSpout` is used, messages could be replayed from Kafka even if the spout crashes. + +Gearpump supports the second for both Storm versions. + +### Security support + +Storm 0.10.x adds security support for following connectors + +* [storm-hdfs](https://github.com/apache/storm/blob/0.10.x-branch/external/storm-hdfs/README.md) +* [storm-hive](https://github.com/apache/storm/blob/0.10.x-branch/external/storm-hive/README.md) +* [storm-hbase](https://github.com/apache/storm/blob/0.10.x-branch/external/storm-hbase/README.md) + +That means users could access kerberos enabled HDFS, Hive and HBase with these connectors. Generally, Storm provides two approaches (please refer to above links for more information) + +1. configure nimbus to automatically get delegation tokens on behalf of the topology submitter user +2. kerberos keytabs are already distributed on worker hosts; users configure keytab path and principal + +Gearpump supports the second approach and users needs to add classpath of HDFS/Hive/HBase to `gearpump.executor.extraClasspath` in `gear.conf` on each node. For example, + + :::json + ################### + ### Executor argument configuration + ### Executor JVM can contains multiple tasks + ################### + executor { + vmargs = "-server -Xms512M -Xmx1024M -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3 -Djava.rmi.server.hostname=localhost" + extraClasspath = "/etc/hadoop/conf" + } + + +## How to run a Storm application on Gearpump + +This section shows how to run an existing Storm jar in a local Gearpump cluster. + +1. launch a local cluster + + :::bash + bin/local + + +2. start a Gearpump Nimbus server + + Users need server's address(`nimbus.host` and `nimbus.thrift.port`) to submit topologies later. The address is written to a yaml config file set with `-output` option. + Users can provide an existing config file where only the address will be overwritten. If not provided, a new file `app.yaml` is created with the config. + + :::bash + bin/storm nimbus -output [conf <custom yaml config>] + + +3. submit Storm applications + + Users can either submit Storm applications through command line or UI. + + a. submit Storm applications through command line + + :::bash + bin/storm app -verbose -config app.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation + + + Users are able to configure their applications through following options + + * `jar` - set the path of a Storm application jar + * `config` - submit the custom configuration file generated when launching Nimbus + + + b. submit Storm application through UI + + 1. Click on the "Create" button on the applications page on UI. + 2. Click on the "Submit Storm Application" item in the pull down menu. + 3. In the popup console, upload the Storm application jar and the configuration file generated when launching Nimbus, + and fill in `storm.starter.ExclamationTopology exclamation` as arguments. + 4. Click on the "Submit" button + + Either way, check the dashboard and you should see data flowing through your topology. + +## How is it different from running on Storm + +### Topology submission + +When a client submits a Storm topology, Gearpump launches locally a simplified version of Storm's Nimbus server `GearpumpNimbus`. `GearpumpNimbus` then translates topology to a directed acyclic graph (DAG) of Gearpump, which is submitted to Gearpump master and deployed as a Gearpump application. + + + +`GearpumpNimbus` supports the following methods + +* `submitTopology` / `submitTopologyWithOpts` +* `killTopology` / `killTopologyWithOpts` +* `getTopology` / `getUserTopology` +* `getClusterInfo` + +### Topology translation + +Here's an example of `WordCountTopology` with acker bolts (ackers) being translated into a Gearpump DAG. + + + +Gearpump creates a `StormProducer` for each Storm spout and a `StormProcessor` for each Storm bolt (except for ackers) with the same parallelism, and wires them together using the same grouping strategy (partitioning in Gearpump) as in Storm. + +At runtime, spouts and bolts are running inside `StormProducer` tasks and `StormProcessor` tasks respectively. Messages emitted by spout are passed to `StormProducer`, transferred to `StormProcessor` and passed down to bolt. Messages are serialized / de-serialized with Storm serializers. + +Storm ackers are dropped since Gearpump has a different mechanism of message tracking and flow control. + +### Task execution + +Each Storm task is executed by a dedicated thread while all Gearpump tasks of an executor share a thread pool. Generally, we can achieve better performance with a shared thread pool. It's possible, however, some tasks block and take up all the threads. In that case, we can +fall back to the Storm way by setting `gearpump.task-dispatcher` to `"gearpump.single-thread-dispatcher"` in `gear.conf`. + +### Message tracking + +Storm tracks the lineage of each message with ackers to guarantee at-least-once message delivery. Failed messages are re-sent from spout. + +Gearpump [tracks messages between a sender and receiver in an efficient way](../introduction/gearpump-internals#how-do-we-detect-message-loss). Message loss causes the whole application to replay from the [minimum timestamp of all pending messages in the system](../introduction/gearpump-internals#application-clock-and-global-clock-service). + +### Flow control + +Storm throttles flow rate at spout, which stops sending messages if the number of unacked messages exceeds `topology.max.spout.pending`. + +Gearpump has flow control between tasks such that [sender cannot flood receiver](../introduction/gearpump-internals#how-do-we-do-flow-control), which is backpressured till the source. + +### Configurations + +All Storm configurations are respected with the following priority order + + :::bash + defaults.yaml < custom file config < application config < component config + + +where + +* application config is submit from Storm application along with the topology +* component config is set in spout / bolt with `getComponentConfiguration` +* custom file config is specified with the `-config` option when submitting Storm application from command line or uploaded from UI + +## StreamCQL Support + +[StreamCQL](https://github.com/HuaweiBigData/StreamCQL) is a Continuous Query Language on RealTime Computation System open sourced by Huawei. +Since StreamCQL already supports Storm, it's straightforward to run StreamCQL over Gearpump. + +1. Install StreamCQL as in the official [README](https://github.com/HuaweiBigData/StreamCQL#install-streamcql) + +2. Launch Gearpump Nimbus Server as before + +3. Go to the installed stream-cql-binary, and change following settings in `conf/streaming-site.xml` with the output Nimbus configs in Step 2. + + :::xml + <property> + <name>streaming.storm.nimbus.host</name> + <value>${nimbus.host}</value> + </property> + <property> + <name>streaming.storm.nimbus.port</name> + <value>${nimbus.thrift.port}</value> + </property> + + +4. Open CQL client shell with `bin/cql` and execute a simple cql example + + :::sql + Streaming> CREATE INPUT STREAM s + (id INT, name STRING, type INT) + SOURCE randomgen + PROPERTIES ( timeUnit = "SECONDS", period = "1", + eventNumPerperiod = "1", isSchedule = "true" ); + + CREATE OUTPUT STREAM rs + (type INT, cc INT) + SINK consoleOutput; + + INSERT INTO STREAM rs SELECT type, COUNT(id) as cc + FROM s[RANGE 20 SECONDS BATCH] + WHERE id > 5 GROUP BY type; + + SUBMIT APPLICATION example; + + +5. Check the dashboard and you should see data flowing through a topology of 3 components. +
