DocumentToMetadataProcessor utility processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6cd97b8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6cd97b8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6cd97b8e Branch: refs/heads/STREAMS-170 Commit: 6cd97b8e8a77c4a29c3f3079223a94a4ef41b877 Parents: 576b849 Author: Steve Blackmon <sblack...@w2odigital.com> Authored: Wed Oct 1 15:42:55 2014 -0500 Committer: Steve Blackmon <sblack...@w2odigital.com> Committed: Wed Oct 1 15:42:55 2014 -0500 ---------------------------------------------------------------------- .../DatumFromMetadataAsDocumentProcessor.java | 17 +-- .../processor/DocumentToMetadataProcessor.java | 118 +++++++++++++++++++ 2 files changed, 119 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6cd97b8e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java index 5e58bb0..9aea4c4 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java @@ -78,7 +78,7 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S return result; } - Map<String, Object> metadata = asMap(metadataObjectNode); + Map<String, Object> metadata = DocumentToMetadataProcessor.asMap(metadataObjectNode); if(entry == null || entry.getMetadata() == null) return result; @@ -128,19 +128,4 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S this.elasticsearchClientManager.getClient().close(); } - public Map<String, Object> asMap(JsonNode node) { - - Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); - Map<String, Object> ret = Maps.newHashMap(); - - Map.Entry<String, JsonNode> entry; - - while (iterator.hasNext()) { - entry = iterator.next(); - if( entry.getValue().asText() != null ) - ret.put(entry.getKey(), entry.getValue().asText()); - } - - return ret; - } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6cd97b8e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java new file mode 100644 index 0000000..804e1ac --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.elasticsearch.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.elasticsearch.ElasticsearchClientManager; +import org.apache.streams.elasticsearch.ElasticsearchConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Uses index and type in metadata to populate current document into datums + */ +public class DocumentToMetadataProcessor implements StreamsProcessor, Serializable { + + public final static String STREAMS_ID = "DatumFromMetadataProcessor"; + + private ElasticsearchClientManager elasticsearchClientManager; + private ElasticsearchReaderConfiguration config; + + private ObjectMapper mapper; + + public DocumentToMetadataProcessor() { + Config config = StreamsConfigurator.config.getConfig("elasticsearch"); + this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + } + + public DocumentToMetadataProcessor(Config config) { + this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + } + + public DocumentToMetadataProcessor(ElasticsearchReaderConfiguration config) { + this.config = config; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + List<StreamsDatum> result = Lists.newArrayList(); + + ObjectNode metadataObjectNode; + try { + metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class); + } catch (IOException e) { + return result; + } + + Map<String, Object> metadata = asMap(metadataObjectNode); + + if(entry == null || metadata == null) + return result; + + entry.setMetadata(metadata); + + result.add(entry); + + return result; + } + + @Override + public void prepare(Object configurationObject) { + this.elasticsearchClientManager = new ElasticsearchClientManager(config); + mapper = StreamsJacksonMapper.getInstance(); + mapper.registerModule(new JsonOrgModule()); + + } + + @Override + public void cleanUp() { + this.elasticsearchClientManager.getClient().close(); + } + + public static Map<String, Object> asMap(JsonNode node) { + + Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); + Map<String, Object> ret = Maps.newHashMap(); + + Map.Entry<String, JsonNode> entry; + + while (iterator.hasNext()) { + entry = iterator.next(); + if( entry.getValue().asText() != null ) + ret.put(entry.getKey(), entry.getValue().asText()); + } + + return ret; + } +}