Repository: incubator-eagle Updated Branches: refs/heads/master ad8cc9c99 -> 8f8fe57e6
[EAGLE-761] remove partition key in kafkaStreamSink https://issues.apache.org/jira/browse/EAGLE-761 Author: Zhao, Qingwen <qingwz...@apache.org> Closes #643 from qingwen220/EAGLE-761. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8f8fe57e Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8f8fe57e Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8f8fe57e Branch: refs/heads/master Commit: 8f8fe57e6ab7484f400333d3369bb849cb2c0e6e Parents: ad8cc9c Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Thu Nov 10 15:55:13 2016 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Thu Nov 10 15:55:13 2016 +0800 ---------------------------------------------------------------------- .../main/java/org/apache/eagle/app/sink/KafkaStreamSink.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8f8fe57e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java index 2207c8c..12acb81 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java @@ -18,13 +18,11 @@ package org.apache.eagle.app.sink; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; -import org.apache.eagle.metadata.utils.StreamIdConversions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +62,9 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> { protected void execute(Object key, Map event, OutputCollector collector) throws Exception { try { String output = new ObjectMapper().writeValueAsString(event); - producer.send(new KeyedMessage(this.topicId, key, output)); + // partition key may cause data skew + //producer.send(new KeyedMessage(this.topicId, key, output)); + producer.send(new KeyedMessage(this.topicId, output)); } catch (Exception ex) { LOG.error(ex.getMessage(), ex); throw ex;