Repository: incubator-eagle
Updated Branches:
  refs/heads/master ad8cc9c99 -> 8f8fe57e6


[EAGLE-761] remove partition key in kafkaStreamSink

https://issues.apache.org/jira/browse/EAGLE-761

Author: Zhao, Qingwen <qingwz...@apache.org>

Closes #643 from qingwen220/EAGLE-761.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8f8fe57e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8f8fe57e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8f8fe57e

Branch: refs/heads/master
Commit: 8f8fe57e6ab7484f400333d3369bb849cb2c0e6e
Parents: ad8cc9c
Author: Zhao, Qingwen <qingwz...@apache.org>
Authored: Thu Nov 10 15:55:13 2016 +0800
Committer: Zhao, Qingwen <qingwz...@apache.org>
Committed: Thu Nov 10 15:55:13 2016 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/eagle/app/sink/KafkaStreamSink.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8f8fe57e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index 2207c8c..12acb81 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -18,13 +18,11 @@ package org.apache.eagle.app.sink;
 
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
-import org.apache.eagle.metadata.utils.StreamIdConversions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +62,9 @@ public class KafkaStreamSink extends 
StormStreamSink<KafkaStreamSinkConfig> {
     protected void execute(Object key, Map event, OutputCollector collector) 
throws Exception {
         try {
             String output = new ObjectMapper().writeValueAsString(event);
-            producer.send(new KeyedMessage(this.topicId, key, output));
+            // partition key may cause data skew
+            //producer.send(new KeyedMessage(this.topicId, key, output));
+            producer.send(new KeyedMessage(this.topicId, output));
         } catch (Exception ex) {
             LOG.error(ex.getMessage(), ex);
             throw ex;

Reply via email to