[
https://issues.apache.org/jira/browse/STREAMS-221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236200#comment-14236200
]
ASF GitHub Bot commented on STREAMS-221:
----------------------------------------
Github user robdouglas commented on a diff in the pull request:
https://github.com/apache/incubator-streams/pull/153#discussion_r21404184
--- Diff:
streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
---
@@ -0,0 +1,189 @@
+package org.apache.streams.components.http.persist;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+/**
+ * Created by steve on 11/12/14.
+ */
+public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
+
+ private final static String STREAMS_ID = "SimpleHTTPPostPersistWriter";
+
+ private final static Logger LOGGER =
LoggerFactory.getLogger(SimpleHTTPPostPersistWriter.class);
+
+ protected ObjectMapper mapper;
+
+ protected URIBuilder uriBuilder;
+
+ protected CloseableHttpClient httpclient;
+
+ protected HttpPersistWriterConfiguration configuration;
+
+ public SimpleHTTPPostPersistWriter() {
+
this(HttpConfigurator.detectPersistWriterConfiguration(StreamsConfigurator.config.getConfig("http")));
+ }
+
+ public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration
configuration) {
+ this.configuration = configuration;
+ }
+
+
+ @Override
+ public void write(StreamsDatum entry) {
+
+ ObjectNode payload = preparePayload(entry);
+
+ Map<String, String> params = prepareParams(entry);
+
+ URI uri = prepareURI(params);
+
+ HttpPost httppost = prepareHttpPost(uri, payload);
+
+ ObjectNode result = executePost(httppost);
+
+ try {
+ LOGGER.debug(mapper.writeValueAsString(result));
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Non-json response", e.getMessage());
+ }
+ }
+
+ /**
+ Override this to alter request URI
+ */
+ protected URI prepareURI(Map<String, String> params) {
+ URI uri = null;
+ for( Map.Entry<String,String> param : params.entrySet()) {
+ uriBuilder = uriBuilder.setParameter(param.getKey(),
param.getValue());
+ }
+ try {
+ uri = uriBuilder.build();
+ } catch (URISyntaxException e) {
+ LOGGER.error("URI error {}", uriBuilder.toString());
+ }
+ return uri;
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+ return Maps.newHashMap();
+ }
+
+ /**
+ Override this to alter json payload on to the request
+ */
+ protected ObjectNode preparePayload(StreamsDatum entry) {
+
+ return (ObjectNode) entry.getDocument();
+ }
+
+ /**
+ Override this to add headers to the request
+ */
+ public HttpPost prepareHttpPost(URI uri, ObjectNode payload) {
+ HttpPost httppost = new HttpPost(uri);
+ httppost.addHeader("content-type",
this.configuration.getContentType());
+// TODO: add support for authentication
+// if( !Strings.isNullOrEmpty(authHeader))
+// httpget.addHeader("Authorization", String.format("Basic %s",
authHeader));
+ try {
+ String entity = mapper.writeValueAsString(payload);
+ httppost.setEntity(new StringEntity(entity));
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ return httppost;
+ }
+
+ protected ObjectNode executePost(HttpPost httpPost) {
+
+ Preconditions.checkNotNull(httpPost);
+
+ ObjectNode result = null;
+
+ CloseableHttpResponse response = null;
+
+ String entityString = null;
+ try {
+ response = httpclient.execute(httpPost);
+ HttpEntity entity = response.getEntity();
+ // TODO: handle retry
+ if (response.getStatusLine() != null &&
response.getStatusLine().getStatusCode() >= 200 && entity != null) {
--- End diff --
Is there a response code enum we could use instead of '200'?
> Simple HTTP Persist Writer
> --------------------------
>
> Key: STREAMS-221
> URL: https://issues.apache.org/jira/browse/STREAMS-221
> Project: Streams
> Issue Type: New Feature
> Reporter: Steve Blackmon
> Assignee: Steve Blackmon
>
> Add a utility persist writer that posts documents to a remote end-point.
> This writer should be able to pass messages between streams pipelines
> executing on other hosts in conjunction with STREAMS-222
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)