http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart.java deleted file mode 100644 index 3bd2414..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart.java +++ /dev/null @@ -1,88 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.iotp; - -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.edgent.connectors.iot.IotDevice; -import org.apache.edgent.connectors.iot.QoS; -import org.apache.edgent.connectors.iotp.IotpDevice; -import org.apache.edgent.providers.direct.DirectProvider; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; - -import com.google.gson.JsonObject; - -/** - * IBM Watson IoT Platform Quickstart sample. - * Submits a JSON device event every second using the - * same format as the Quickstart device simulator, - * with keys {@code temp}, {@code humidity} and {@code objectTemp} - * and random values. - * <P> - * The device type is {@code iotsamples-edgent} and a random - * device identifier is generated. Both are printed out when - * the application starts. - * </P> - * A URL is also printed that allows viewing of the data - * as it received by the Quickstart service. - * - * <p>See {@code scripts/connectors/iotp/README} for information about running the sample. - */ -public class IotpQuickstart { - - public static void main(String[] args) { - - DirectProvider tp = new DirectProvider(); - Topology topology = tp.newTopology("IotpQuickstart"); - - // Declare a connection to IoTF Quickstart service - String deviceId = "qs" + Long.toHexString(new Random().nextLong()); - IotDevice device = IotpDevice.quickstart(topology, deviceId); - - System.out.println("Quickstart device type:" + IotpDevice.QUICKSTART_DEVICE_TYPE); - System.out.println("Quickstart device id :" + deviceId); - System.out.println("https://quickstart.internetofthings.ibmcloud.com/#/device/" - + deviceId); - - Random r = new Random(); - TStream<double[]> raw = topology.poll(() -> { - double[] v = new double[3]; - - v[0] = r.nextGaussian() * 10.0 + 40.0; - v[1] = r.nextGaussian() * 10.0 + 50.0; - v[2] = r.nextGaussian() * 10.0 + 60.0; - - return v; - }, 1, TimeUnit.SECONDS); - - TStream<JsonObject> json = raw.map(v -> { - JsonObject j = new JsonObject(); - j.addProperty("temp", v[0]); - j.addProperty("humidity", v[1]); - j.addProperty("objectTemp", v[2]); - return j; - }); - - device.events(json, "sensors", QoS.FIRE_AND_FORGET); - - tp.submit(topology); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java deleted file mode 100644 index ac4ee92..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java +++ /dev/null @@ -1,118 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.iotp; - -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.edgent.connectors.iot.QoS; -import org.apache.edgent.connectors.iotp.IotpDevice; -import org.apache.edgent.providers.direct.DirectProvider; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; - -import com.google.gson.JsonObject; -import com.ibm.iotf.client.device.DeviceClient; -import com.ibm.iotf.devicemgmt.DeviceData; -import com.ibm.iotf.devicemgmt.device.ManagedDevice; - -/** - * IBM Watson IoT Platform Quickstart sample. - * Submits a JSON device event every second using the - * same format as the Quickstart device simulator, - * with keys {@code temp}, {@code humidity} and {@code objectTemp} - * and random values. - * <P> - * The device type is {@code iotsamples-edgent} and a random - * device identifier is generated. Both are printed out when - * the application starts. - * <P> - * A URL is also printed that allows viewing of the data - * as it received by the Quickstart service. - * <P> - * This sample demonstrates using the WIoTP API to initialize the IotpDevice - * connector as well as the ability to publish events using the WIoTP HTTP protocol. - * - * <p>See {@code scripts/connectors/iotp/README} for information about running the sample. - */ -public class IotpQuickstart2 { - - public static void main(String[] args) throws Exception { - List<String> argList = Arrays.asList(args); - boolean useDeviceClient = argList.contains("useDeviceClient"); - boolean useHttp = argList.contains("useHttp"); - - DirectProvider tp = new DirectProvider(); - Topology topology = tp.newTopology("IotpQuickstart"); - - // Declare a connector to IoTP Quickstart service, initializing with WIoTP API - String deviceId = "qs" + Long.toHexString(new Random().nextLong()); - Properties options = new Properties(); - options.setProperty("org", "quickstart"); - options.setProperty("type", IotpDevice.QUICKSTART_DEVICE_TYPE); - options.setProperty("id", deviceId); - IotpDevice device; - if (useDeviceClient) { - System.out.println("Using WIoTP DeviceClient"); - device = new IotpDevice(topology, new DeviceClient(options)); - } - else { - System.out.println("Using WIoTP ManagedDevice"); - DeviceData deviceData = new DeviceData.Builder().build(); - device = new IotpDevice(topology, new ManagedDevice(options, deviceData)); - } - - System.out.println("Quickstart device type:" + IotpDevice.QUICKSTART_DEVICE_TYPE); - System.out.println("Quickstart device id :" + deviceId); - System.out.println("https://quickstart.internetofthings.ibmcloud.com/#/device/" - + deviceId); - - Random r = new Random(); - TStream<double[]> raw = topology.poll(() -> { - double[] v = new double[3]; - - v[0] = r.nextGaussian() * 10.0 + 40.0; - v[1] = r.nextGaussian() * 10.0 + 50.0; - v[2] = r.nextGaussian() * 10.0 + 60.0; - - return v; - }, 1, TimeUnit.SECONDS); - - TStream<JsonObject> json = raw.map(v -> { - JsonObject j = new JsonObject(); - j.addProperty("temp", v[0]); - j.addProperty("humidity", v[1]); - j.addProperty("objectTemp", v[2]); - return j; - }); - - if (!useHttp) { - device.events(json, "sensors", QoS.FIRE_AND_FORGET); - } - else { - System.out.println("Publishing events using HTTP"); - device.httpEvents(json, "sensors"); - } - - tp.submit(topology); - } - } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpSensors.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpSensors.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpSensors.java deleted file mode 100644 index e2f4b12..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpSensors.java +++ /dev/null @@ -1,164 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.edgent.samples.connectors.iotp; - -import java.io.File; -import java.util.concurrent.TimeUnit; - -import org.apache.edgent.connectors.iot.HeartBeat; -import org.apache.edgent.connectors.iot.IotDevice; -import org.apache.edgent.connectors.iot.QoS; -import org.apache.edgent.connectors.iotp.IotpDevice; -import org.apache.edgent.providers.direct.DirectProvider; -import org.apache.edgent.providers.direct.DirectTopology; -import org.apache.edgent.samples.topology.SensorsAggregates; -import org.apache.edgent.topology.TStream; - -import com.google.gson.JsonObject; - -/** - * Sample sending sensor device events to IBM Watson IoT Platform. <BR> - * Simulates a couple of bursty sensors and sends the readings from the sensors - * to IBM Watson IoT Platform as device events with id {@code sensors}. <BR> - * Subscribes to device commands with identifier {@code display}. - * <P> - * In addition a device event with id {@code hearbeat} is sent - * every minute. This ensure a connection attempt to IBM Watson IoT Platform - * is made immediately rather than waiting for a bursty sensor to become - * active. - * <P> - * This sample requires an IBM Watson IoT Platform service and a device configuration.<BR> - * In order to see commands send from IBM Watson IoT Platform - * there must be an analytic application - * that sends commands with the identifier {@code display}. - * </P> - * - * <p>See {@code scripts/connectors/iotp/README} for information about a - * prototype device configuration file and running the sample. - */ -public class IotpSensors { - - /** - * Run the IotpSensors application. - * - * Takes a single argument that is the path to the - * device configuration file containing the connection - * authentication information. - * - * @param args Must contain the path to the device configuration file. - * - * @see IotpDevice#IotpDevice(org.apache.edgent.topology.Topology, File) - */ - public static void main(String[] args) { - - String deviceCfg = args[0]; - - DirectProvider tp = new DirectProvider(); - DirectTopology topology = tp.newTopology("IotpSensors"); - - // Declare a connection to IoTF - IotDevice device = new IotpDevice(topology, new File(deviceCfg)); - - // Simulated sensors for this device. - simulatedSensors(device, true); - - // Heartbeat - heartBeat(device, true); - - // Subscribe to commands of id "display" for this - // device and print them to standard out - displayMessages(device, true); - - tp.submit(topology); - } - - - /** - * Simulate two bursty sensors and send the readings as IoTF device events - * with an identifier of {@code sensors}. - * - * @param device - * IoT device - * @param print - * True if the data submitted as events should also be printed to - * standard out. - */ - public static void simulatedSensors(IotDevice device, boolean print) { - - TStream<JsonObject> sensors = SensorsAggregates.sensorsAB(device.topology()); - if (print) - sensors.print(); - - // Send the device streams as IoTF device events - // with event identifier "sensors". - device.events(sensors, "sensors", QoS.FIRE_AND_FORGET); - } - - /** - * Create a heart beat device event with - * identifier {@code heartbeat} to - * ensure there is some immediate output and - * the connection to IoTF happens as soon as possible. - * @param device IoT device - * @param print true to print generated heartbeat tuples to System.out. - */ - public static void heartBeat(IotDevice device, boolean print) { - TStream<JsonObject> hbs = - HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES, "heartbeat"); - if (print) - hbs.print(); - } - - - /** - * Subscribe to IoTP device commands with identifier {@code display}. - * Subscribing to device commands returns a stream of JSON objects that - * include a timestamp ({@code tsms}), command identifier ({@code command}) - * and payload ({@code payload}). Payload is the application specific - * portion of the command. <BR> - * In this case the payload is expected to be a JSON object containing a - * {@code msg} key with a string display message. <BR> - * The returned stream consists of the display message string extracted from - * the JSON payload. - * <P> - * Note to receive commands a analytic application must exist that generates - * them through IBM Watson IoT Platform. - * </P> - * - * @param device the device - * @param print true to print the received command's payload to System.out. - * @return the stream - * @see IotDevice#commands(String...) - */ - public static TStream<String> displayMessages(IotDevice device, boolean print) { - // Subscribe to commands of id "display" for this device - TStream<JsonObject> statusMsgs = device.commands("display"); - - // The returned JSON object includes several fields - // tsms - Timestamp in milliseconds (this is generic to a command) - // payload.msg - Status message (this is specific to this application) - - // Map to a String object containing the message - TStream<String> messages = statusMsgs.map(j -> j.getAsJsonObject("payload").getAsJsonPrimitive("msg").getAsString()); - if (print) - messages.print(); - return messages; - } -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/package-info.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/package-info.java deleted file mode 100644 index 43dcfe9..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/package-info.java +++ /dev/null @@ -1,39 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/** - * Samples showing use of the IBM Watson IoT Platform connector - * to publish device events and subscribe to device - * commands. - * - * <p>The "Quickstart" samples connect to the IBM Watson IoT Platform - * using its Quickstart feature that does not require device registration. - * When the samples are run they print out a URL which allows a browser - * to see the data being sent from this sample. - * - * <p>The other samples connect to your IBM Watson IoT Platform service instance - * using device and application registrations that you have created with your - * service instance. - * - * <p>See each sample's Javadoc for more information. - * - * <p>See {@code scripts/connectors/iotp/README} for information about running the samples. - */ -package org.apache.edgent.samples.connectors.iotp; - http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java deleted file mode 100644 index a0264f1..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java +++ /dev/null @@ -1,140 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.jdbc; - -import java.lang.reflect.Method; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Properties; - -import javax.sql.DataSource; - -/** - * Utilities for the sample's non-streaming JDBC database related actions. - */ -public class DbUtils { - - /** - * Get the JDBC {@link DataSource} for the database. - * <p> - * The "db.name" property specifies the name of the database. - * Defaults to "JdbcConnectorSampleDb". - * - * @param props configuration properties - * @return the DataSource - * @throws Exception on failure - */ - public static DataSource getDataSource(Properties props) throws Exception { - return createDerbyEmbeddedDataSource(props); - } - - /** - * Initialize the sample's database. - * <p> - * Tables are created as needed and purged. - * @param ds the DataSource - * @throws Exception on failure - */ - public static void initDb(DataSource ds) throws Exception { - createTables(ds); - purgeTables(ds); - } - - /** - * Purge the sample's tables - * @param ds the DataSource - * @throws Exception on failure - */ - public static void purgeTables(DataSource ds) throws Exception { - try (Connection cn = ds.getConnection()) { - Statement stmt = cn.createStatement(); - stmt.execute("DELETE FROM persons"); - } - } - - private static void createTables(DataSource ds) throws Exception { - try (Connection cn = ds.getConnection()) { - Statement stmt = cn.createStatement(); - stmt.execute("CREATE TABLE persons " - + "(" - + "id INTEGER NOT NULL," - + "firstname VARCHAR(40) NOT NULL," - + "lastname VARCHAR(40) NOT NULL," - + "PRIMARY KEY (id)" - + ")" - ); - } - catch (SQLException e) { - if (e.getLocalizedMessage().contains("already exists")) - return; - else - throw e; - } - } - - private static DataSource createDerbyEmbeddedDataSource(Properties props) throws Exception - { - String dbName = props.getProperty("db.name", "JdbcConnectorSampleDb"); - - // For our sample, avoid a compile-time dependency to the jdbc driver. - // At runtime, require that the classpath can find it. - - String DERBY_DATA_SOURCE = "org.apache.derby.jdbc.EmbeddedDataSource"; - - Class<?> nsDataSource = null; - try { - nsDataSource = Class.forName(DERBY_DATA_SOURCE); - } - catch (ClassNotFoundException e) { - String msg = "Fix the test classpath. "; - if (System.getenv("DERBY_HOME") == null) { - msg += "DERBY_HOME not set. "; - } - msg += "Class not found: "+e.getLocalizedMessage(); - System.err.println(msg); - throw new IllegalStateException(msg); - } - DataSource ds = (DataSource) nsDataSource.newInstance(); - - @SuppressWarnings("rawtypes") - Class[] methodParams = new Class[] {String.class}; - Method dbname = nsDataSource.getMethod("setDatabaseName", methodParams); - Object[] args = new Object[] {dbName}; - dbname.invoke(ds, args); - - // create the db if necessary - Method create = nsDataSource.getMethod("setCreateDatabase", methodParams); - args = new Object[] {"create"}; - create.invoke(ds, args); - - // set the user - Method setuser = nsDataSource.getMethod("setUser", methodParams); - args = new Object[] { props.getProperty("db.user", System.getProperty("user.name")) }; - setuser.invoke(ds, args); - - // optionally set the pw - Method setpw = nsDataSource.getMethod("setPassword", methodParams); - args = new Object[] { props.getProperty("db.password") }; - if (args[0] != null) - setpw.invoke(ds, args); - - return ds; - } -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java deleted file mode 100644 index bb57629..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java +++ /dev/null @@ -1,37 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.jdbc; - -/** - * A Person object for the sample. - */ -public class Person { - int id; - String firstName; - String lastName; - Person(int id, String first, String last) { - this.id = id; - this.firstName = first; - this.lastName = last; - } - public String toString() { - return String.format("id=%d first=%s last=%s", - id, firstName, lastName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java deleted file mode 100644 index f7f1211..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java +++ /dev/null @@ -1,96 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.jdbc; - -import java.io.BufferedReader; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.stream.Collectors; - -/** - * Utilities for loading the sample's person data. - */ -public class PersonData { - - /** - * Load the person data from the path specified by the "persondata.path" - * property. - * @param props configuration properties - * @return the loaded person data - * @throws Exception on failure - */ - public static List<Person> loadPersonData(Properties props) throws Exception { - String pathname = props.getProperty("persondata.path"); - List<Person> persons = new ArrayList<>(); - Path path = new File(pathname).toPath(); - try (BufferedReader br = Files.newBufferedReader(path)) { - int lineno = 0; - String line; - while ((line = br.readLine()) != null) { - lineno++; - Object[] fields = parseLine(line, lineno, pathname); - if (fields == null) - continue; - persons.add(new Person((Integer)fields[0], (String)fields[1], (String)fields[2])); - } - } - return persons; - } - - private static Object[] parseLine(String line, int lineno, String pathname) { - line = line.trim(); - if (line.startsWith("#")) - return null; - - // id,firstName,lastName - String[] items = line.split(","); - if (items.length < 3) - throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname); - int id; - try { - id = new Integer(items[0]); - if (id < 1) - throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname); - } - - Object[] fields = new Object[3]; - fields[0] = id; - fields[1] = items[1].trim(); - fields[2] = items[2].trim(); - return fields; - } - - /** - * Convert a {@code List<Person>} to a {@code List<PersonId>} - * @param persons the person list - * @return the person id list - */ - public static List<PersonId> toPersonIds(List<Person> persons) { - return persons.stream() - .map(person -> new PersonId(person.id)) - .collect(Collectors.toList()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java deleted file mode 100644 index 218a027..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java +++ /dev/null @@ -1,32 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.jdbc; - -/** - * Another class containing a person id for the sample. - */ -public class PersonId { - int id; - PersonId(int id) { - this.id = id; - } - public String toString() { - return String.format("id=%d", id); - } -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java deleted file mode 100644 index 006b459..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java +++ /dev/null @@ -1,102 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.jdbc; - -import java.io.File; -import java.nio.file.Files; -import java.util.List; -import java.util.Properties; - -import org.apache.edgent.connectors.jdbc.JdbcStreams; -import org.apache.edgent.providers.direct.DirectProvider; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; - -/** - * A simple JDBC connector sample demonstrating streaming read access - * of a dbms table and creating stream tuples from the results. - */ -public class SimpleReaderApp { - private final Properties props; - - public static void main(String[] args) throws Exception { - if (args.length != 1) - throw new Exception("missing pathname to jdbc.properties file"); - SimpleReaderApp reader = new SimpleReaderApp(args[0]); - reader.run(); - } - - /** - * @param jdbcPropsPath pathname to properties file - */ - SimpleReaderApp(String jdbcPropsPath) throws Exception { - props = new Properties(); - props.load(Files.newBufferedReader(new File(jdbcPropsPath).toPath())); - } - - /** - * Create a topology for the writer application and run it. - */ - private void run() throws Exception { - DirectProvider tp = new DirectProvider(); - - // build the application/topology - - Topology t = tp.newTopology("jdbcSampleWriter"); - - // Create the JDBC connector - JdbcStreams myDb = new JdbcStreams(t, - () -> DbUtils.getDataSource(props), - dataSource -> dataSource.getConnection()); - - // Create a sample stream of tuples containing a person id - List<PersonId> personIdList = PersonData.toPersonIds(PersonData.loadPersonData(props)); - personIdList.add(new PersonId(99999)); - TStream<PersonId> personIds = t.collection(personIdList); - - // For each tuple on the stream, read info from the db table - // using the "id", and create a Person tuple on the result stream. - TStream<Person> persons = myDb.executeStatement(personIds, - () -> "SELECT id, firstname, lastname FROM persons WHERE id = ?", - (personId,stmt) -> stmt.setInt(1, personId.id), - (personId,rSet,exc,resultStream) -> { - if (exc != null) { - // some failure processing this tuple. an error was logged. - System.err.println("Unable to process id="+personId+": "+exc); - return; - } - if (rSet.next()) { - resultStream.accept( - new Person(rSet.getInt("id"), - rSet.getString("firstname"), - rSet.getString("lastname"))); - } - else { - System.err.println("Unknown person id="+personId.id); - } - } - ); - - // print out Person tuples as they are retrieved - persons.sink(person -> System.out.println("retrieved person: "+person)); - - // run the application / topology - tp.submit(t); - } -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java deleted file mode 100644 index 018c97b..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java +++ /dev/null @@ -1,85 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.jdbc; - -import java.io.File; -import java.nio.file.Files; -import java.util.Properties; - -import org.apache.edgent.connectors.jdbc.JdbcStreams; -import org.apache.edgent.providers.direct.DirectProvider; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; - -/** - * A simple JDBC connector sample demonstrating streaming write access - * of a dbms to add stream tuples to a table. - */ -public class SimpleWriterApp { - private final Properties props; - - public static void main(String[] args) throws Exception { - if (args.length != 1) - throw new Exception("missing pathname to jdbc.properties file"); - SimpleWriterApp writer = new SimpleWriterApp(args[0]); - DbUtils.initDb(DbUtils.getDataSource(writer.props)); - writer.run(); - } - - /** - * @param jdbcPropsPath pathname to properties file - */ - SimpleWriterApp(String jdbcPropsPath) throws Exception { - props = new Properties(); - props.load(Files.newBufferedReader(new File(jdbcPropsPath).toPath())); - } - - /** - * Create a topology for the writer application and run it. - */ - private void run() throws Exception { - DirectProvider tp = new DirectProvider(); - - // build the application/topology - - Topology t = tp.newTopology("jdbcSampleWriter"); - - // Create the JDBC connector - JdbcStreams myDb = new JdbcStreams(t, - () -> DbUtils.getDataSource(props), - dataSource -> dataSource.getConnection()); - - // Create a sample stream of Person tuples - TStream<Person> persons = t.collection(PersonData.loadPersonData(props)); - - // Write stream tuples to a table. - myDb.executeStatement(persons, - () -> "INSERT INTO persons VALUES(?,?,?)", - (person,stmt) -> { - System.out.println("Inserting into persons table: person "+person); - stmt.setInt(1, person.id); - stmt.setString(2, person.firstName); - stmt.setString(3, person.lastName); - } - ); - - // run the application / topology - tp.submit(t); - } -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java deleted file mode 100644 index 4e88b77..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java +++ /dev/null @@ -1,32 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -/** - * Samples showing use of the - * <a href="{@docRoot}/org/apache/edgent/connectors/jdbc/package-summary.html"> - * JDBC stream connector</a>. - * <p> - * See <edgent-release>/scripts/connectors/jdbc/README to run the samples. - * <p> - * The following samples are provided: - * <ul> - * <li>SimpleReaderApp.java - a simple dbms reader application topology</li> - * <li>SimpleWriterApp.java - a simple dbms writer application topology</li> - * </ul> - */ -package org.apache.edgent.samples.connectors.jdbc; http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java deleted file mode 100644 index 7d5a530..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java +++ /dev/null @@ -1,144 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.kafka; - -import org.apache.edgent.samples.connectors.Options; - -/** - * Demonstrate integrating with the Apache Kafka messaging system - * <a href="http://kafka.apache.org">http://kafka.apache.org</a>. - * <p> - * {@link org.apache.edgent.connectors.kafka.KafkaProducer KafkaProducer} is - * a connector used to create a bridge between topology streams - * and publishing to Kafka topics. - * <p> - * {@link org.apache.edgent.connectors.kafka.KafkaConsumer KafkaConsumer} is - * a connector used to create a bridge between topology streams - * and subscribing to Kafka topics. - * <p> - * The client either publishes messages to a topic or - * subscribes to the topic and reports the messages received. - * <p> - * By default, a running Kafka cluster with the following - * characteristics is assumed: - * <ul> - * <li>{@code bootstrap.servers="localhost:9092"}</li> - * <li>{@code zookeeper.connect="localhost:2181"}</li> - * <li>kafka topic {@code "kafkaSampleTopic"} exists</li> - * </ul> - * <p> - * See the Apache Kafka link above for information about setting up a Kafka - * cluster as well as creating a topic. - * <p> - * This may be executed from as: - * <UL> - * <LI> - * {@code java -cp samples/lib/org.apache.edgent.samples.connectors.kafka.jar - * org.apache.edgent.samples.connectors.kafka.KafkaClient -h - * } - Run directly from the command line. - * </LI> - * </UL> - * <UL> - * <LI> - * An application execution within your IDE once you set the class path to include the correct jars.</LI> - * </UL> - */ -public class KafkaClient { - private static final String usage = "usage: " - + "\n" + "[-v] [-h]" - + "\n" + "pub | sub" - + "\n" + "[bootstrap.servers=<value>]" - + "\n" + "[zookeeper.connect=<value>]" - + "\n" + "[group.id=<value>]" - + "\n" + "[pubcnt=<value>]" - ; - - public static void main(String[] args) throws Exception { - Options options = processArgs(args); - if (options == null) - return; - - Runner.run(options); - } - - private static Options processArgs(String[] args) { - Options options = new Options(); - initHandlers(options); - try { - options.processArgs(args); - } - catch (Exception e) { - System.err.println(e); - System.out.println(usage); - return null; - } - - if ((Boolean)options.get(OPT_HELP)) { - System.out.println(usage); - return null; - } - - if (!(Boolean)options.get(OPT_PUB) && !(Boolean)options.get(OPT_SUB)) { - System.err.println(String.format("Missing argument '%s' or '%s'.", OPT_PUB, OPT_SUB)); - System.out.println(usage); - return null; - } - - String[] announceOpts = new String[] { - }; - if ((Boolean)options.get(OPT_VERBOSE)) - announceOpts = options.getAll().stream().map(e -> e.getKey()).toArray(String[]::new); - for (String opt : announceOpts) { - Object value = options.get(opt); - if (value != null) { - if (opt.toLowerCase().contains("password")) - value = "*****"; - System.out.println("Using "+opt+"="+value); - } - } - - return options; - } - - static final String OPT_VERBOSE = "-v"; - static final String OPT_HELP = "-h"; - static final String OPT_PUB = "pub"; - static final String OPT_SUB = "sub"; - static final String OPT_BOOTSTRAP_SERVERS = "bootstrap.servers"; - static final String OPT_ZOOKEEPER_CONNECT = "zookeeper.connect"; - static final String OPT_GROUP_ID = "group.id"; - static final String OPT_TOPIC = "topic"; - static final String OPT_PUB_CNT = "pubcnt"; - - private static void initHandlers(Options opts) { - // options for which we have a default - opts.addHandler(OPT_HELP, null, false); - opts.addHandler(OPT_VERBOSE, null, false); - opts.addHandler(OPT_PUB, null, false); - opts.addHandler(OPT_SUB, null, false); - opts.addHandler(OPT_BOOTSTRAP_SERVERS, v -> v, "localhost:9092"); - opts.addHandler(OPT_ZOOKEEPER_CONNECT, v -> v, "localhost:2181"); - opts.addHandler(OPT_TOPIC, v -> v, "kafkaSampleTopic"); - opts.addHandler(OPT_PUB_CNT, v -> Integer.valueOf(v), -1); - - // optional options (no default value) - opts.addHandler(OPT_GROUP_ID, v -> v); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java deleted file mode 100644 index 6746a7d..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java +++ /dev/null @@ -1,81 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.kafka; - -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_BOOTSTRAP_SERVERS; -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_PUB_CNT; -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.edgent.samples.connectors.MsgSupplier; -import org.apache.edgent.samples.connectors.Options; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; -import org.apache.edgent.topology.TopologyProvider; - -import org.apache.edgent.connectors.kafka.KafkaProducer; - -/** - * A Kafka producer/publisher topology application. - */ -public class PublisherApp { - private final TopologyProvider tp; - private final Options options; - - /** - * @param tp the TopologyProvider to use. - * @param options - */ - PublisherApp(TopologyProvider tp, Options options) { - this.tp = tp; - this.options = options; - } - - /** - * Create a topology for the publisher application. - * @return the Topology - */ - public Topology buildAppTopology() { - Topology t = tp.newTopology("kafkaClientPublisher"); - - // Create a sample stream of tuples to publish - TStream<String> msgs = t.poll(new MsgSupplier(options.get(OPT_PUB_CNT)), - 1L, TimeUnit.SECONDS); - - // Create the KafkaProducer broker connector - Map<String,Object> config = newConfig(); - KafkaProducer kafka = new KafkaProducer(t, () -> config); - - // Publish the stream to the topic. The String tuple is the message value. - kafka.publish(msgs, options.get(OPT_TOPIC)); - - return t; - } - - private Map<String,Object> newConfig() { - Map<String,Object> config = new HashMap<>(); - // required kafka configuration items - config.put("bootstrap.servers", options.get(OPT_BOOTSTRAP_SERVERS)); - return config; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README deleted file mode 100644 index 6554f8b..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README +++ /dev/null @@ -1,26 +0,0 @@ -Sample Kafka Publisher and Subscriber topology applications. - -By default the samples assume the following kafka broker configuration: -- bootstrap.servers="localhost:9092" -- zookeeper.connect="localhost:2181" -- kafka topic "kafkaSampleTopic" exists -- no authentication - -See http://kafka.apache.org for the code and setup information for -a Kafka broker. - -see scripts/connectors/kafka/README to run them - -The simple sample ------------------ - -SimplePublisherApp.java - build and run the simple publisher application topology -SimpleSubscriberApp.java - build and run the simple subscriber application topology - -The fully configurable client ------------------------------ - -Runner.java - build and run the publisher or subscriber -PublisherApp.java - build the publisher application topology -SubscriberApp.java - build the subscriber application topology -KafkaClient.java - the client's command line interface http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java deleted file mode 100644 index 2ffccfc..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java +++ /dev/null @@ -1,68 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.kafka; - -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_BOOTSTRAP_SERVERS; -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_PUB; -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC; -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_ZOOKEEPER_CONNECT; - -import org.apache.edgent.console.server.HttpServer; -import org.apache.edgent.providers.development.DevelopmentProvider; -import org.apache.edgent.samples.connectors.Options; -import org.apache.edgent.topology.Topology; - -/** - * Build and run the publisher or subscriber application. - */ -public class Runner { - /** - * Build and run the publisher or subscriber application. - * @param options command line options - * @throws Exception on failure - */ - public static void run(Options options) throws Exception { - boolean isPub = options.get(OPT_PUB); - - // Get a topology runtime provider - DevelopmentProvider tp = new DevelopmentProvider(); - - Topology top; - if (isPub) { - PublisherApp publisher = new PublisherApp(tp, options); - top = publisher.buildAppTopology(); - } - else { - SubscriberApp subscriber = new SubscriberApp(tp, options); - top = subscriber.buildAppTopology(); - } - - // Submit the app/topology; send or receive the messages. - System.out.println( - "Using Kafka cluster at bootstrap.servers=" - + options.get(OPT_BOOTSTRAP_SERVERS) - + " zookeeper.connect=" + options.get(OPT_ZOOKEEPER_CONNECT) - + "\n" + (isPub ? "Publishing" : "Subscribing") - + " to topic " + options.get(OPT_TOPIC)); - System.out.println("Console URL for the job: " - + tp.getServices().getService(HttpServer.class).getConsoleUrl()); - tp.submit(top); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java deleted file mode 100644 index a8b9492..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java +++ /dev/null @@ -1,99 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.kafka; - -import java.io.File; -import java.nio.file.Files; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.edgent.console.server.HttpServer; -import org.apache.edgent.providers.development.DevelopmentProvider; -import org.apache.edgent.samples.connectors.Util; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; - -import org.apache.edgent.connectors.kafka.KafkaProducer; - -/** - * A simple Kafka publisher topology application. - */ -public class SimplePublisherApp { - private final Properties props; - private final String topic; - - public static void main(String[] args) throws Exception { - if (args.length != 1) - throw new Exception("missing pathname to kafka.properties file"); - SimplePublisherApp publisher = new SimplePublisherApp(args[0]); - publisher.run(); - } - - /** - * @param kafkaPropsPath pathname to properties file - */ - SimplePublisherApp(String kafkaPropsPath) throws Exception { - props = new Properties(); - props.load(Files.newBufferedReader(new File(kafkaPropsPath).toPath())); - topic = props.getProperty("topic"); - } - - private Map<String,Object> createKafkaConfig() { - Map<String,Object> kafkaConfig = new HashMap<>(); - kafkaConfig.put("bootstrap.servers", props.get("bootstrap.servers")); - return kafkaConfig; - } - - /** - * Create a topology for the publisher application and run it. - */ - private void run() throws Exception { - DevelopmentProvider tp = new DevelopmentProvider(); - - // build the application/topology - - Topology t = tp.newTopology("kafkaSamplePublisher"); - - // Create the Kafka Producer broker connector - Map<String,Object> kafkaConfig = createKafkaConfig(); - KafkaProducer kafka = new KafkaProducer(t, () -> kafkaConfig); - - // Create a sample stream of tuples to publish - AtomicInteger cnt = new AtomicInteger(); - TStream<String> msgs = t.poll( - () -> { - String msg = String.format("Message-%d from %s", - cnt.incrementAndGet(), Util.simpleTS()); - System.out.println("poll generated msg to publish: " + msg); - return msg; - }, 1L, TimeUnit.SECONDS); - - // Publish the stream to the topic. The String tuple is the message value. - kafka.publish(msgs, topic); - - // run the application / topology - System.out.println("Console URL for the job: " - + tp.getServices().getService(HttpServer.class).getConsoleUrl()); - tp.submit(t); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java deleted file mode 100644 index 7cef424..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java +++ /dev/null @@ -1,95 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.kafka; - -import java.io.File; -import java.nio.file.Files; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import org.apache.edgent.console.server.HttpServer; -import org.apache.edgent.providers.development.DevelopmentProvider; -import org.apache.edgent.samples.connectors.Util; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; - -import org.apache.edgent.connectors.kafka.KafkaConsumer; - -/** - * A simple Kafka subscriber topology application. - */ -public class SimpleSubscriberApp { - private final Properties props; - private final String topic; - - public static void main(String[] args) throws Exception { - if (args.length != 1) - throw new Exception("missing pathname to kafka.properties file"); - SimpleSubscriberApp subscriber = new SimpleSubscriberApp(args[0]); - subscriber.run(); - } - - /** - * @param kafkaPropsPath pathname to properties file - */ - SimpleSubscriberApp(String kafkaPropsPath) throws Exception { - props = new Properties(); - props.load(Files.newBufferedReader(new File(kafkaPropsPath).toPath())); - topic = props.getProperty("topic"); - } - - private Map<String,Object> createKafkaConfig() { - Map<String,Object> kafkaConfig = new HashMap<>(); - kafkaConfig.put("zookeeper.connect", props.get("zookeeper.connect")); - // for the sample, be insensitive to old/multiple consumers for - // the topic/groupId hanging around - kafkaConfig.put("group.id", - "kafkaSampleConsumer_" + Util.simpleTS().replaceAll(":", "")); - return kafkaConfig; - } - - /** - * Create a topology for the subscriber application and run it. - */ - private void run() throws Exception { - DevelopmentProvider tp = new DevelopmentProvider(); - - // build the application/topology - - Topology t = tp.newTopology("kafkaSampleSubscriber"); - - // Create the Kafka Consumer broker connector - Map<String,Object> kafkaConfig = createKafkaConfig(); - KafkaConsumer kafka = new KafkaConsumer(t, () -> kafkaConfig); - - // Subscribe to the topic and create a stream of messages - TStream<String> msgs = kafka.subscribe(rec -> rec.value(), topic); - - // Process the received msgs - just print them out - msgs.sink(tuple -> System.out.println( - String.format("[%s] received: %s", Util.simpleTS(), tuple))); - - // run the application / topology - System.out.println("Console URL for the job: " - + tp.getServices().getService(HttpServer.class).getConsoleUrl()); - tp.submit(t); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SubscriberApp.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SubscriberApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SubscriberApp.java deleted file mode 100644 index 7405f39..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SubscriberApp.java +++ /dev/null @@ -1,91 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.kafka; - -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_GROUP_ID; -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC; -import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_ZOOKEEPER_CONNECT; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.edgent.samples.connectors.Options; -import org.apache.edgent.samples.connectors.Util; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; -import org.apache.edgent.topology.TopologyProvider; - -import org.apache.edgent.connectors.kafka.KafkaConsumer; - -/** - * A Kafka consumer/subscriber topology application. - */ -public class SubscriberApp { - private final TopologyProvider tp; - private final Options options; - private final String uniq = Util.simpleTS(); - - /** - * @param top the TopologyProvider to use. - * @param options - */ - SubscriberApp(TopologyProvider tp, Options options) { - this.tp = tp; - this.options = options; - } - - /** - * Create a topology for the subscriber application. - * @return the Topology - */ - public Topology buildAppTopology() { - Topology t = tp.newTopology("kafkaClientSubscriber"); - - // Create the KafkaConsumer broker connector - Map<String,Object> config = newConfig(t); - KafkaConsumer kafka = new KafkaConsumer(t, () -> config); - - System.out.println("Using Kafka consumer group.id " - + config.get(OPT_GROUP_ID)); - - // Subscribe to the topic and create a stream of messages - TStream<String> msgs = kafka.subscribe(rec -> rec.value(), - (String)options.get(OPT_TOPIC)); - - // Process the received msgs - just print them out - msgs.sink(tuple -> System.out.println( - String.format("[%s] received: %s", Util.simpleTS(), tuple))); - - return t; - } - - private Map<String,Object> newConfig(Topology t) { - Map<String,Object> config = new HashMap<>(); - // required kafka configuration items - config.put("zookeeper.connect", options.get(OPT_ZOOKEEPER_CONNECT)); - config.put("group.id", options.get(OPT_GROUP_ID, newGroupId(t.getName()))); - return config; - } - - private String newGroupId(String name) { - // be insensitive to old consumers for the topic/groupId hanging around - String groupId = name + "_" + uniq.replaceAll(":", ""); - return groupId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/package-info.java deleted file mode 100644 index 761d053..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/package-info.java +++ /dev/null @@ -1,35 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/** - * Samples showing use of the - * <a href="{@docRoot}/org/apache/edgent/connectors/kafka/package-summary.html"> - * Apache Kafka stream connector</a>. - * <p> - * See <edgent-release>/scripts/connectors/kafka/README to run the samples. - * <p> - * The following simple samples are provided: - * <ul> - * <li>SimplePublisherApp.java - a simple publisher application topology</li> - * <li>SimpleSubscriberApp.java - a simple subscriber application topology</li> - * </ul> - * The remaining classes are part of a sample that more fully exposes - * controlling various configuration options. - */ -package org.apache.edgent.samples.connectors.kafka; http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/MqttClient.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/MqttClient.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/MqttClient.java deleted file mode 100644 index 9cf6c37..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/MqttClient.java +++ /dev/null @@ -1,183 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.mqtt; - -import org.apache.edgent.samples.connectors.Options; - -/** - * Demonstrate integrating with the MQTT messaging system - * <a href="http://mqtt.org">http://mqtt.org</a>. - * <p> - * {@link org.apache.edgent.connectors.mqtt.MqttStreams MqttStreams} is - * a connector used to create a bridge between topology streams - * and an MQTT broker. - * <p> - * The client either publishes some messages to a MQTT topic - * or subscribes to the topic and reports the messages received. - * <p> - * By default, a running MQTT broker with the following - * characteristics is assumed: - * <ul> - * <li>the broker's connection is {@code tcp://localhost:1883}</li> - * <li>the broker is configured for no authentication</li> - * </ul> - * <p> - * See the MQTT link above for information about setting up a MQTT broker. - * <p> - * This may be executed as: - * <UL> - * <LI> - * {@code java -cp samples/lib/org.apache.edgent.samples.connectors.mqtt.jar - * org.apache.edgent.samples.connectors.mqtt.MqttClient -h - * } - Run directly from the command line. - * </LI> - * <LI> - * Specify absolute pathnames if using the {@code trustStore} - * or {@code keyStore} arguments. - * </LI> - * <LI> - * An application execution within your IDE once you set the class path to include the correct jars. - * </LI> - * </UL> - */ -public class MqttClient { - private static final String usage = "usage: " - + "\n" + "[-v] [-h]" - + "\n" + "pub | sub" - + "\n" + "[serverURI=<value>]" - + "\n" + "[clientId=<value>]" - + "\n" + "[cleanSession=<true|false>]" - + "\n" + "[topic=<value>] [qos=<value>]" - + "\n" + "[retain]" - + "\n" + "[pubcnt=<value>]" - + "\n" + "[cnTimeout=<value>]" - + "\n" + "[actionTimeoutMillis=<value>]" - + "\n" + "[idleTimeout=<value>]" - + "\n" + "[idleReconnectInterval=<value>]" - + "\n" + "[userID=<value>] [password=<value>]" - + "\n" + "[trustStore=<value>] [trustStorePassword=<value>]" - + "\n" + "[keyStore=<value>] [keyStorePassword=<value>]" - ; - - public static void main(String[] args) throws Exception { - Options options = processArgs(args); - if (options == null) - return; - - Runner.run(options); - } - - private static Options processArgs(String[] args) { - Options options = new Options(); - initHandlers(options); - try { - options.processArgs(args); - } - catch (Exception e) { - System.err.println(e); - System.out.println(usage); - return null; - } - - if ((Boolean)options.get(OPT_HELP)) { - System.out.println(usage); - return null; - } - - if (!(Boolean)options.get(OPT_PUB) && !(Boolean)options.get(OPT_SUB)) { - System.err.println(String.format("Missing argument '%s' or '%s'.", OPT_PUB, OPT_SUB)); - System.out.println(usage); - return null; - } - - if (options.get(OPT_PASSWORD) != null) - options.put(OPT_USER_ID, options.get(OPT_USER_ID, System.getProperty("user.name"))); - - String[] announceOpts = new String[] { - OPT_USER_ID, - OPT_PASSWORD, - OPT_TRUST_STORE, - OPT_TRUST_STORE_PASSWORD, - OPT_KEY_STORE, - OPT_KEY_STORE_PASSWORD - }; - if ((Boolean)options.get(OPT_VERBOSE)) - announceOpts = options.getAll().stream().map(e -> e.getKey()).toArray(String[]::new); - for (String opt : announceOpts) { - Object value = options.get(opt); - if (value != null) { - if (opt.toLowerCase().contains("password")) - value = "*****"; - System.out.println("Using "+opt+"="+value); - } - } - - return options; - } - - static final String OPT_VERBOSE = "-v"; - static final String OPT_HELP = "-h"; - static final String OPT_PUB = "pub"; - static final String OPT_SUB = "sub"; - static final String OPT_SERVER_URI = "serverURI"; - static final String OPT_CLIENT_ID = "clientId"; - static final String OPT_CN_TIMEOUT_SEC = "cnTimeout"; - static final String OPT_ACTION_TIMEOUT_MILLIS = "actionTimeoutMillis"; - static final String OPT_QOS = "qos"; - static final String OPT_TOPIC = "topic"; - static final String OPT_CLEAN_SESSION = "cleanSession"; - static final String OPT_RETAIN = "retain"; - static final String OPT_USER_ID = "userID"; - static final String OPT_PASSWORD = "password"; - static final String OPT_TRUST_STORE = "trustStore"; - static final String OPT_TRUST_STORE_PASSWORD = "trustStorePassword"; - static final String OPT_KEY_STORE = "keyStore"; - static final String OPT_KEY_STORE_PASSWORD = "keyStorePassword"; - static final String OPT_PUB_CNT = "pubcnt"; - static final String OPT_IDLE_TIMEOUT_SEC = "idleTimeout"; - static final String OPT_IDLE_RECONNECT_INTERVAL_SEC = "idleReconnectInterval"; - - private static void initHandlers(Options opts) { - // options for which we have a default - opts.addHandler(OPT_HELP, null, false); - opts.addHandler(OPT_VERBOSE, null, false); - opts.addHandler(OPT_PUB, null, false); - opts.addHandler(OPT_SUB, null, false); - opts.addHandler(OPT_SERVER_URI, v -> v, "tcp://localhost:1883"); - opts.addHandler(OPT_TOPIC, v -> v, "mqttSampleTopic"); - opts.addHandler(OPT_RETAIN, null, false); - opts.addHandler(OPT_PUB_CNT, v -> Integer.valueOf(v), -1); - opts.addHandler(OPT_QOS, v -> Integer.valueOf(v), 0); - - // optional options (no default value) - opts.addHandler(OPT_CLIENT_ID, v -> v); - opts.addHandler(OPT_CN_TIMEOUT_SEC, v -> Integer.valueOf(v)); - opts.addHandler(OPT_ACTION_TIMEOUT_MILLIS, v -> Long.valueOf(v)); - opts.addHandler(OPT_CLEAN_SESSION, v -> Boolean.valueOf(v)); - opts.addHandler(OPT_USER_ID, v -> v); - opts.addHandler(OPT_PASSWORD, v -> v); - opts.addHandler(OPT_TRUST_STORE, v -> v); - opts.addHandler(OPT_TRUST_STORE_PASSWORD, v -> v); - opts.addHandler(OPT_KEY_STORE, v -> v); - opts.addHandler(OPT_KEY_STORE_PASSWORD, v -> v); - opts.addHandler(OPT_IDLE_TIMEOUT_SEC, v -> Integer.valueOf(v)); - opts.addHandler(OPT_IDLE_RECONNECT_INTERVAL_SEC, v -> Integer.valueOf(v)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/PublisherApp.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/PublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/PublisherApp.java deleted file mode 100644 index 4be6ce5..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/PublisherApp.java +++ /dev/null @@ -1,74 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.edgent.samples.connectors.mqtt; - -import static org.apache.edgent.samples.connectors.mqtt.MqttClient.OPT_PUB_CNT; -import static org.apache.edgent.samples.connectors.mqtt.MqttClient.OPT_QOS; -import static org.apache.edgent.samples.connectors.mqtt.MqttClient.OPT_RETAIN; -import static org.apache.edgent.samples.connectors.mqtt.MqttClient.OPT_TOPIC; - -import java.util.concurrent.TimeUnit; - -import org.apache.edgent.connectors.mqtt.MqttConfig; -import org.apache.edgent.connectors.mqtt.MqttStreams; -import org.apache.edgent.samples.connectors.MsgSupplier; -import org.apache.edgent.samples.connectors.Options; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; -import org.apache.edgent.topology.TopologyProvider; - -/** - * A MQTT publisher topology application. - */ -public class PublisherApp { - private final TopologyProvider tp; - private final Options options; - - /** - * @param tp the TopologyProvider to use. - * @param options - */ - PublisherApp(TopologyProvider tp, Options options) { - this.tp = tp; - this.options = options; - } - - /** - * Create a topology for the publisher application. - * @return the Topology - */ - public Topology buildAppTopology() { - Topology t = tp.newTopology("mqttClientPublisher"); - - // Create a sample stream of tuples to publish - TStream<String> msgs = t.poll(new MsgSupplier(options.get(OPT_PUB_CNT)), - 1L, TimeUnit.SECONDS); - - // Create the MQTT broker connector - MqttConfig config= Runner.newConfig(options); - MqttStreams mqtt = new MqttStreams(t, () -> config); - - // Publish the stream to the topic. The String tuple is the message value. - mqtt.publish(msgs, options.get(OPT_TOPIC), - options.get(OPT_QOS), options.get(OPT_RETAIN)); - - return t; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/a7aeb2b4/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/README ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/README b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/README deleted file mode 100644 index 7760f50..0000000 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/mqtt/README +++ /dev/null @@ -1,24 +0,0 @@ -Sample MQTT Publisher and Subscriber topology applications. - -By default, the following MQTT broker configuration is assumed: -- the broker's connection URL is tcp://localhost:1883 -- the broker is configured for no authentication - -See http://mqtt.org for the code and setup information for -a mqtt broker. - -see scripts/connectors/mqtt/README to run them - -The simple sample ------------------ - -SimplePublisherApp.java - build and run the simple publisher application topology -SimpleSubscriberApp.java - build and run the simple subscriber application topology - -The fully configurable clients ------------------------------- - -Runner.java - build and run the publisher or subscriber -PublisherApp.java - build the publisher application topology -SubscriberApp.java - build the subscriber application topology -MqttClient.java - the client's command line interface