This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 5c610a1 STORM-3462: examples-hive: fix all checkstyle warnings new efb4047 Merge pull request #3080 from krichter722/checkstyle-hive-examples 5c610a1 is described below commit 5c610a1f110245a70e29c0e4c85d253c37b7e2be Author: Karl-Philipp Richter <krich...@posteo.de> AuthorDate: Sat Jul 6 18:18:47 2019 +0200 STORM-3462: examples-hive: fix all checkstyle warnings --- examples/storm-hive-examples/pom.xml | 2 +- .../storm/hive/bolt/BucketTestHiveTopology.java | 50 ++++++++++++---------- .../org/apache/storm/hive/bolt/HiveTopology.java | 19 ++++---- .../storm/hive/bolt/HiveTopologyPartitioned.java | 19 ++++---- .../storm/hive/trident/TridentHiveTopology.java | 27 ++++++------ 5 files changed, 61 insertions(+), 56 deletions(-) diff --git a/examples/storm-hive-examples/pom.xml b/examples/storm-hive-examples/pom.xml index 7fe757d..10e2112 100644 --- a/examples/storm-hive-examples/pom.xml +++ b/examples/storm-hive-examples/pom.xml @@ -97,7 +97,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>67</maxAllowedViolations> + <maxAllowedViolations>0</maxAllowedViolations> </configuration> </plugin> <plugin> diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java index 85bbde7..61ea97c 100644 --- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java +++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java @@ -48,33 +48,33 @@ public class BucketTestHiveTopology { public static void main(String[] args) throws Exception { if ((args == null) || (args.length < 7)) { System.out.println("Usage: BucketTestHiveTopology metastoreURI " - + "dbName tableName dataFileLocation hiveBatchSize " + - "hiveTickTupl]eIntervalSecs workers [topologyNamey] [keytab file]" + + "dbName tableName dataFileLocation hiveBatchSize " + + "hiveTickTupl]eIntervalSecs workers [topologyNamey] [keytab file]" + " [principal name] "); System.exit(1); } - String metaStoreURI = args[0]; + String metaStoreUri = args[0]; String dbName = args[1]; String tblName = args[2]; - String sourceFileLocation = args[3]; Integer hiveBatchSize = Integer.parseInt(args[4]); Integer hiveTickTupleIntervalSecs = Integer.parseInt(args[5]); Integer workers = Integer.parseInt(args[6]); - String[] colNames = { "ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", - "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", - "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", - "ss_wholesale_cost", "ss_list_price", "ss_sales_price", - "ss_ext_discount_amt", "ss_ext_sales_price", - "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", - "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", - "ss_net_profit" }; + String[] colNames = { + "ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", + "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", + "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", + "ss_wholesale_cost", "ss_list_price", "ss_sales_price", + "ss_ext_discount_amt", "ss_ext_sales_price", + "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", + "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", + "ss_net_profit" + }; Config config = new Config(); config.setNumWorkers(workers); - UserDataSpout spout = new UserDataSpout().withDataFile(sourceFileLocation); DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)).withTimeAsPartitionField("yyyy/MM/dd"); HiveOptions hiveOptions; - hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(hiveBatchSize); // doing below because its affecting storm metrics most likely @@ -87,6 +87,8 @@ public class BucketTestHiveTopology { } HiveBolt hiveBolt = new HiveBolt(hiveOptions); TopologyBuilder builder = new TopologyBuilder(); + String sourceFileLocation = args[3]; + UserDataSpout spout = new UserDataSpout().withDataFile(sourceFileLocation); builder.setSpout(USER_SPOUT_ID, spout, 1); // SentenceSpout --> MyBolt builder.setBolt(BOLT_ID, hiveBolt, 14) @@ -105,16 +107,18 @@ public class BucketTestHiveTopology { private BufferedReader br; private int count = 0; private long total = 0L; - private String[] outputFields = { "ss_sold_date_sk", "ss_sold_time_sk", - "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", - "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", - "ss_quantity", "ss_wholesale_cost", "ss_list_price", - "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", - "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", - "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", - "ss_net_profit" }; + private String[] outputFields = { + "ss_sold_date_sk", "ss_sold_time_sk", + "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", + "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", + "ss_quantity", "ss_wholesale_cost", "ss_list_price", + "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", + "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", + "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", + "ss_net_profit" + }; - public UserDataSpout withDataFile (String filePath) { + public UserDataSpout withDataFile(String filePath) { this.filePath = filePath; return this; } diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java index 5dbb036..46f6d6d 100644 --- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java +++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java @@ -41,7 +41,7 @@ public class HiveTopology { static final String TOPOLOGY_NAME = "hive-test-topology1"; public static void main(String[] args) throws Exception { - String metaStoreURI = args[0]; + String metaStoreUri = args[0]; String dbName = args[1]; String tblName = args[2]; String[] colNames = {"id","name","phone","street","city","state"}; @@ -52,14 +52,14 @@ public class HiveTopology { .withColumnFields(new Fields(colNames)); HiveOptions hiveOptions; if (args.length == 6) { - hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(100) .withIdleTimeout(10) .withKerberosKeytab(args[4]) .withKerberosPrincipal(args[5]); } else { - hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(100) .withIdleTimeout(10) @@ -74,7 +74,7 @@ public class HiveTopology { .shuffleGrouping(USER_SPOUT_ID); String topoName = TOPOLOGY_NAME; - if(args.length >= 4) { + if (args.length >= 4) { topoName = args[3]; } StormSubmitter.submitTopology(topoName, config, builder.createTopology()); @@ -84,6 +84,7 @@ public class HiveTopology { try { Thread.sleep(seconds * 1000); } catch (InterruptedException e) { + //ignore } } @@ -91,10 +92,10 @@ public class HiveTopology { private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector collector; private String[] sentences = { - "1,user1,123456,street1,sunnyvale,ca", - "2,user2,123456,street2,sunnyvale,ca", - "3,user3,123456,street3,san jose,ca", - "4,user4,123456,street4,san jose,ca", + "1,user1,123456,street1,sunnyvale,ca", + "2,user2,123456,street2,sunnyvale,ca", + "3,user3,123456,street3,san jose,ca", + "4,user4,123456,street4,san jose,ca", }; private int index = 0; private int count = 0; @@ -125,7 +126,7 @@ public class HiveTopology { } count++; total++; - if(count > 1000){ + if (count > 1000) { count = 0; System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total); } diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java index 6af50de..4d31497 100644 --- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java +++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java @@ -42,7 +42,7 @@ public class HiveTopologyPartitioned { static final String TOPOLOGY_NAME = "hive-test-topology-partitioned"; public static void main(String[] args) throws Exception { - String metaStoreURI = args[0]; + String metaStoreUri = args[0]; String dbName = args[1]; String tblName = args[2]; String[] partNames = {"city","state"}; @@ -55,14 +55,14 @@ public class HiveTopologyPartitioned { .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions; if (args.length == 6) { - hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(1000) .withIdleTimeout(10) .withKerberosKeytab(args[4]) .withKerberosPrincipal(args[5]); } else { - hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(1000) .withIdleTimeout(10); @@ -85,6 +85,7 @@ public class HiveTopologyPartitioned { try { Thread.sleep(seconds * 1000); } catch (InterruptedException e) { + //ignore } } @@ -92,10 +93,10 @@ public class HiveTopologyPartitioned { private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector collector; private String[] sentences = { - "1,user1,123456,street1,sunnyvale,ca", - "2,user2,123456,street2,sunnyvale,ca", - "3,user3,123456,street3,san jose,ca", - "4,user4,123456,street4,san jose,ca", + "1,user1,123456,street1,sunnyvale,ca", + "2,user2,123456,street2,sunnyvale,ca", + "3,user3,123456,street3,san jose,ca", + "4,user4,123456,street4,san jose,ca", }; private int index = 0; private int count = 0; @@ -126,8 +127,8 @@ public class HiveTopologyPartitioned { } count++; total++; - if(count > 1000){ - Utils.sleep(1000); + if (count > 1000) { + Utils.sleep(1000); count = 0; System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total); } diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java index f2e578e..bb3e00c 100644 --- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java +++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/trident/TridentHiveTopology.java @@ -18,7 +18,6 @@ package org.apache.storm.hive.trident; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,11 +41,10 @@ import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TridentHiveTopology { private static final Logger LOG = LoggerFactory.getLogger(TridentHiveTopology.class); - public static StormTopology buildTopology(String metaStoreURI, String dbName, String tblName, Object keytab, Object principal) { + public static StormTopology buildTopology(String metaStoreUri, String dbName, String tblName, Object keytab, Object principal) { int batchSize = 100; FixedBatchSpout spout = new FixedBatchSpout(batchSize); spout.setCycle(true); @@ -60,7 +58,7 @@ public class TridentHiveTopology { .withPartitionFields(new Fields(partNames)); HiveOptions hiveOptions; if (keytab != null && principal != null) { - hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(batchSize) .withIdleTimeout(10) @@ -68,7 +66,7 @@ public class TridentHiveTopology { .withKerberosKeytab((String)keytab) .withKerberosPrincipal((String)principal); } else { - hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + hiveOptions = new HiveOptions(metaStoreUri,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(batchSize) .withCallTimeout(30000) @@ -83,11 +81,12 @@ public class TridentHiveTopology { try { Thread.sleep(seconds * 1000); } catch (InterruptedException e) { + //ignore } } public static void main(String[] args) throws Exception { - String metaStoreURI = args[0]; + String metaStoreUri = args[0]; String dbName = args[1]; String tblName = args[2]; Config conf = new Config(); @@ -108,8 +107,8 @@ public class TridentHiveTopology { } try { - StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreURI, dbName, tblName,null,null)); - } catch(SubmitterHookException e) { + StormSubmitter.submitTopology(args[3], conf, buildTopology(metaStoreUri, dbName, tblName,null,null)); + } catch (SubmitterHookException e) { LOG.warn("Topology is submitted but invoking ISubmitterHook failed", e); } catch (Exception e) { LOG.warn("Failed to submit topology ", e); @@ -149,20 +148,20 @@ public class TridentHiveTopology { @Override public void emitBatch(long batchId, TridentCollector collector) { List<List<Object>> batch = this.batches.get(batchId); - if(batch == null){ + if (batch == null) { batch = new ArrayList<List<Object>>(); - if(index>=outputs.length && cycle) { + if (index >= outputs.length && cycle) { index = 0; } - for(int i=0; i < maxBatchSize; index++, i++) { - if(index == outputs.length){ - index=0; + for (int i = 0; i < maxBatchSize; index++, i++) { + if (index == outputs.length) { + index = 0; } batch.add(outputs[index]); } this.batches.put(batchId, batch); } - for(List<Object> list : batch){ + for (List<Object> list : batch) { collector.emit(list); } }