adding backup job adding hdfs writer config
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5d0f6bc4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5d0f6bc4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5d0f6bc4 Branch: refs/heads/master Commit: 5d0f6bc422bb68c245c75a1c005568575324b06b Parents: 9e757ae Author: Steve Blackmon <[email protected]> Authored: Sun Mar 30 16:55:18 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Sun Mar 30 16:55:18 2014 -0500 ---------------------------------------------------------------------- .../apache/streams/hdfs/HdfsConfigurator.java | 12 ++++++++++ .../streams/hdfs/WebHdfsPersistWriter.java | 23 ++++++++++++-------- 2 files changed, 26 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5d0f6bc4/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java index dfbc273..f9790e8 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java @@ -44,4 +44,16 @@ public class HdfsConfigurator { return hdfsReaderConfiguration; } + public static HdfsWriterConfiguration detectWriterConfiguration(Config hdfs) { + + HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs); + HdfsWriterConfiguration hdfsWriterConfiguration = mapper.convertValue(hdfsConfiguration, HdfsWriterConfiguration.class); + + String writerPath = hdfs.getString("writerPath"); + + hdfsWriterConfiguration.setWriterPath(writerPath); + + return hdfsWriterConfiguration; + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5d0f6bc4/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java index 55c55b3..12f9a5e 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java @@ -3,21 +3,16 @@ package org.apache.streams.hdfs; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; -import com.typesafe.config.Config; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.core.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.streams.hdfs.HdfsConfiguration; - import java.io.Closeable; import java.io.Flushable; import java.io.IOException; @@ -25,10 +20,12 @@ import java.io.OutputStreamWriter; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Queue; -public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable +public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable { public final static String STREAMS_ID = "WebHdfsPersistWriter"; @@ -264,4 +261,12 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl e.printStackTrace(); } } + + @Override + public DatumStatusCounter getDatumStatusCounter() { + DatumStatusCounter counters = new DatumStatusCounter(); + counters.incrementAttempt(this.totalRecordsWritten); + counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten); + return counters; + } }
