Repository: beam
Updated Branches:
  refs/heads/master e43a290ef -> a991c9d85


add SpEL to hide kafka client difference of 0.9/0.19


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ab850aca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ab850aca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ab850aca

Branch: refs/heads/master
Commit: ab850acae39fac2d02f1663599379c7c8a76a41c
Parents: e43a290
Author: mingmxu <ming...@ebay.com>
Authored: Fri Feb 17 23:11:19 2017 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Tue Feb 21 09:33:19 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |  7 +++
 sdks/java/io/kafka/pom.xml                      |  5 ++
 .../apache/beam/sdk/io/kafka/ConsumerSpEL.java  | 60 ++++++++++++++++++++
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 20 ++++---
 4 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ab850aca/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0688b73..9cbaf67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
     <stax2.version>3.1.4</stax2.version>
     <storage.version>v1-rev71-1.22.0</storage.version>
     <woodstox.version>4.4.1</woodstox.version>
+    <spring.version>4.3.5.RELEASE</spring.version>
     
     <compiler.error.flag>-Werror</compiler.error.flag>
     
<compiler.default.pkginfo.flag>-Xpkginfo:always</compiler.default.pkginfo.flag>
@@ -878,6 +879,12 @@
         <artifactId>byte-buddy</artifactId>
         <version>1.6.8</version>
       </dependency>
+      
+      <dependency>
+        <groupId>org.springframework</groupId>
+        <artifactId>spring-expression</artifactId>
+        <version>${spring.version}</version>
+      </dependency>
 
       <!-- Testing -->
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ab850aca/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index d25fb3f..d66463a 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -139,6 +139,11 @@
       <artifactId>auto-value</artifactId>
       <scope>provided</scope>
     </dependency>
+    
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-expression</artifactId>
+    </dependency>
 
     <!-- test dependencies-->
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/ab850aca/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
new file mode 100644
index 0000000..5b63bf8
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.util.Collection;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
+import org.springframework.expression.spel.SpelParserConfiguration;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
+
+/**.
+ * ConsumerSpEL to handle multiple of versions of Consumer API between Kafka 
0.9 and 0.10.
+ * It auto detects the input type List/Collection/Varargs,
+ * to eliminate the method definition differences.
+ */
+class ConsumerSpEL {
+  SpelParserConfiguration config = new SpelParserConfiguration(true, true);
+  ExpressionParser parser = new SpelExpressionParser(config);
+
+  Expression seek2endExpression =
+      parser.parseExpression("#consumer.seekToEnd(#tp)");
+
+  Expression assignExpression =
+      parser.parseExpression("#consumer.assign(#tp)");
+
+  public ConsumerSpEL() {}
+
+  public void evaluateSeek2End(Consumer consumer, TopicPartition 
topicPartitions) {
+    StandardEvaluationContext mapContext = new StandardEvaluationContext();
+    mapContext.setVariable("consumer", consumer);
+    mapContext.setVariable("tp", topicPartitions);
+    seek2endExpression.getValue(mapContext);
+  }
+
+  public void evaluateAssign(Consumer consumer, Collection<TopicPartition> 
topicPartitions) {
+    StandardEvaluationContext mapContext = new StandardEvaluationContext();
+    mapContext.setVariable("consumer", consumer);
+    mapContext.setVariable("tp", topicPartitions);
+    assignExpression.getValue(mapContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ab850aca/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 80a0eb7..5fd34b9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -98,7 +98,9 @@ import org.slf4j.LoggerFactory;
 
 /**
  * An unbounded source and a sink for <a 
href="http://kafka.apache.org/";>Kafka</a> topics.
- * Kafka version 0.9 and above are supported.
+ * Kafka version 0.9 and 0.10 are supported. If you need a specific version of 
Kafka
+ * client(e.g. 0.9 for 0.9 servers, or 0.10 for security features), specify 
explicit
+ * kafka-client dependency.
  *
  * <h3>Reading from Kafka topics</h3>
  *
@@ -212,7 +214,7 @@ public class KafkaIO {
         .setTopicPartitions(new ArrayList<TopicPartition>())
         .setKeyCoder(ByteArrayCoder.of())
         .setValueCoder(ByteArrayCoder.of())
-        .setConsumerFactoryFn(Read.KAFKA_9_CONSUMER_FACTORY_FN)
+        .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
         .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
         .setMaxNumRecords(Long.MAX_VALUE)
         .build();
@@ -228,7 +230,7 @@ public class KafkaIO {
     return new AutoValue_KafkaIO_Read.Builder<K, V>()
         .setTopics(new ArrayList<String>())
         .setTopicPartitions(new ArrayList<TopicPartition>())
-        .setConsumerFactoryFn(Read.KAFKA_9_CONSUMER_FACTORY_FN)
+        .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
         .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
         .setMaxNumRecords(Long.MAX_VALUE)
         .build();
@@ -492,7 +494,7 @@ public class KafkaIO {
 
     // default Kafka 0.9 Consumer supplier.
     private static final SerializableFunction<Map<String, Object>, 
Consumer<byte[], byte[]>>
-      KAFKA_9_CONSUMER_FACTORY_FN =
+      KAFKA_CONSUMER_FACTORY_FN =
         new SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>() {
           public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
             return new KafkaConsumer<>(config);
@@ -712,6 +714,9 @@ public class KafkaIO {
 
     private static final long UNINITIALIZED_OFFSET = -1;
 
+    //Add SpEL instance to cover the interface difference of Kafka client
+    private transient ConsumerSpEL consumerSpEL;
+
     /** watermark before any records have been read. */
     private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
 
@@ -851,9 +856,10 @@ public class KafkaIO {
 
     @Override
     public boolean start() throws IOException {
+      this.consumerSpEL = new ConsumerSpEL();
       Read<K, V> spec = source.spec;
       consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
-      consumer.assign(spec.getTopicPartitions());
+      consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
 
       for (PartitionState p : partitionStates) {
         if (p.nextOffset != UNINITIALIZED_OFFSET) {
@@ -889,7 +895,7 @@ public class KafkaIO {
       offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false);
 
       offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
-      offsetConsumer.assign(spec.getTopicPartitions());
+      consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions());
 
       offsetFetcherThread.scheduleAtFixedRate(
           new Runnable() {
@@ -987,7 +993,7 @@ public class KafkaIO {
     private void updateLatestOffsets() {
       for (PartitionState p : partitionStates) {
         try {
-          offsetConsumer.seekToEnd(p.topicPartition);
+          consumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition);
           long offset = offsetConsumer.position(p.topicPartition);
           p.setLatestOffset(offset);
         } catch (Exception e) {

Reply via email to