Fixed bug where meta data was not be copied to cloned datums
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a6bd76a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a6bd76a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a6bd76a4 Branch: refs/heads/master Commit: a6bd76a49f5f8f7f4b94ea390d6c90a8c128c6ed Parents: d8feb5b Author: rebanks <[email protected]> Authored: Wed Apr 2 16:15:43 2014 -0500 Committer: rebanks <[email protected]> Committed: Wed Apr 2 16:15:43 2014 -0500 ---------------------------------------------------------------------- .../streams/local/tasks/BaseStreamsTask.java | 24 ++++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a6bd76a4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 694cb76..8006560 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -10,10 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; +import java.util.*; /** * @@ -116,13 +113,13 @@ public abstract class BaseStreamsTask implements StreamsTask { try { if(datum.document instanceof ObjectNode) { - return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid); + return copyMetaData(datum, new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid)); } else if(datum.document instanceof Activity) { - return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class), + return copyMetaData(datum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class), datum.timestamp, - datum.sequenceid); + datum.sequenceid)); } // else if(this.mapper.canSerialize(datum.document.getClass())){ // return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()), @@ -156,4 +153,17 @@ public abstract class BaseStreamsTask implements StreamsTask { } while( !success ); } + + private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) { + Map<String, Object> fromMeta = copyFrom.getMetadata(); + Map<String, Object> toMeta = copyTo.getMetadata(); + for(String key : fromMeta.keySet()) { + Object value = fromMeta.get(key); + if(value instanceof Serializable) + toMeta.put(key, SerializationUtil.cloneBySerialization(value)); + else //hope for the best - should be serializable + toMeta.put(key, value); + } + return copyTo; + } }
