refactored to move duplicated code to new protected methods in ElasticsearchPersistWriter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bf87552 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bf87552 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bf87552 Branch: refs/heads/STREAMS-170 Commit: 5bf8755236ff3fe53eae7b0f131bfbf57647ba29 Parents: 9c98a45 Author: sblackmon <sblack...@apache.org> Authored: Tue Sep 16 13:59:20 2014 -0500 Committer: sblackmon <sblack...@apache.org> Committed: Tue Sep 16 13:59:20 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchPersistDeleter.java | 20 +------ .../ElasticsearchPersistUpdater.java | 20 +------ .../ElasticsearchPersistWriter.java | 62 ++++++++++++++------ 3 files changed, 51 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java index 95004f5..319cece 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java @@ -51,23 +51,9 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl LOGGER.debug("Delete Metadata: {}", metadata); - String index = null; - String type = null; - String id = streamsDatum.getId(); - - if( metadata != null && metadata.containsKey("index")) - index = (String) streamsDatum.getMetadata().get("index"); - if( metadata != null && metadata.containsKey("type")) - type = (String) streamsDatum.getMetadata().get("type"); - if( id == null && metadata != null && metadata.containsKey("id")) - id = (String) streamsDatum.getMetadata().get("id"); - - if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - index = config.getIndex(); - } - if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - type = config.getType(); - } + String index = getIndex(metadata, config); + String type = getType(metadata, config); + String id = getId(streamsDatum); try { delete(index, type, id); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java index d0e8acd..872c65e 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java @@ -51,23 +51,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl LOGGER.debug("Update Metadata: {}", metadata); - String index = null; - String type = null; - String id = streamsDatum.getId(); - - if( metadata != null && metadata.containsKey("index")) - index = (String) streamsDatum.getMetadata().get("index"); - if( metadata != null && metadata.containsKey("type")) - type = (String) streamsDatum.getMetadata().get("type"); - if( id == null && metadata != null && metadata.containsKey("id")) - id = (String) streamsDatum.getMetadata().get("id"); - - if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - index = config.getIndex(); - } - if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - type = config.getType(); - } + String index = getIndex(metadata, config); + String type = getType(metadata, config); + String id = getId(streamsDatum); String json; try { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index ca643b9..4ec3315 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -145,23 +145,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt LOGGER.debug("Write Metadata: {}", metadata); - String index = null; - String type = null; - String id = streamsDatum.getId(); - - if( metadata != null && metadata.containsKey("index")) - index = (String) streamsDatum.getMetadata().get("index"); - if( metadata != null && metadata.containsKey("type")) - type = (String) streamsDatum.getMetadata().get("type"); - if( id == null && metadata != null && metadata.containsKey("id")) - id = (String) streamsDatum.getMetadata().get("id"); - - if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - index = config.getIndex(); - } - if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - type = config.getType(); - } + String index = getIndex(metadata, config); + String type = getType(metadata, config); + String id = getId(streamsDatum); try { add(index, type, id, @@ -506,4 +492,46 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis), MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())); } + + protected String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { + + String index = null; + + if( metadata != null && metadata.containsKey("index")) + index = (String) metadata.get("index"); + + if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + index = config.getIndex(); + } + + return index; + } + + protected String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { + + String type = null; + + if( metadata != null && metadata.containsKey("type")) + type = (String) metadata.get("type"); + + if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + type = config.getType(); + } + + + return type; + } + + protected String getId(StreamsDatum datum) { + + String id = datum.getId(); + + Map<String, Object> metadata = datum.getMetadata(); + + if( id == null && metadata != null && metadata.containsKey("id")) + id = (String) datum.getMetadata().get("id"); + + return id; + } + }