Repository: beam Updated Branches: refs/heads/master e43a290ef -> a991c9d85
add SpEL to hide kafka client difference of 0.9/0.19 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ab850aca Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ab850aca Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ab850aca Branch: refs/heads/master Commit: ab850acae39fac2d02f1663599379c7c8a76a41c Parents: e43a290 Author: mingmxu <ming...@ebay.com> Authored: Fri Feb 17 23:11:19 2017 -0800 Committer: Davor Bonaci <da...@google.com> Committed: Tue Feb 21 09:33:19 2017 -0800 ---------------------------------------------------------------------- pom.xml | 7 +++ sdks/java/io/kafka/pom.xml | 5 ++ .../apache/beam/sdk/io/kafka/ConsumerSpEL.java | 60 ++++++++++++++++++++ .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 20 ++++--- 4 files changed, 85 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ab850aca/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0688b73..9cbaf67 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ <stax2.version>3.1.4</stax2.version> <storage.version>v1-rev71-1.22.0</storage.version> <woodstox.version>4.4.1</woodstox.version> + <spring.version>4.3.5.RELEASE</spring.version> <compiler.error.flag>-Werror</compiler.error.flag> <compiler.default.pkginfo.flag>-Xpkginfo:always</compiler.default.pkginfo.flag> @@ -878,6 +879,12 @@ <artifactId>byte-buddy</artifactId> <version>1.6.8</version> </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-expression</artifactId> + <version>${spring.version}</version> + </dependency> <!-- Testing --> http://git-wip-us.apache.org/repos/asf/beam/blob/ab850aca/sdks/java/io/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml index d25fb3f..d66463a 100644 --- a/sdks/java/io/kafka/pom.xml +++ b/sdks/java/io/kafka/pom.xml @@ -139,6 +139,11 @@ <artifactId>auto-value</artifactId> <scope>provided</scope> </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-expression</artifactId> + </dependency> <!-- test dependencies--> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/ab850aca/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java new file mode 100644 index 0000000..5b63bf8 --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import java.util.Collection; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.SpelParserConfiguration; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; + +/**. + * ConsumerSpEL to handle multiple of versions of Consumer API between Kafka 0.9 and 0.10. + * It auto detects the input type List/Collection/Varargs, + * to eliminate the method definition differences. + */ +class ConsumerSpEL { + SpelParserConfiguration config = new SpelParserConfiguration(true, true); + ExpressionParser parser = new SpelExpressionParser(config); + + Expression seek2endExpression = + parser.parseExpression("#consumer.seekToEnd(#tp)"); + + Expression assignExpression = + parser.parseExpression("#consumer.assign(#tp)"); + + public ConsumerSpEL() {} + + public void evaluateSeek2End(Consumer consumer, TopicPartition topicPartitions) { + StandardEvaluationContext mapContext = new StandardEvaluationContext(); + mapContext.setVariable("consumer", consumer); + mapContext.setVariable("tp", topicPartitions); + seek2endExpression.getValue(mapContext); + } + + public void evaluateAssign(Consumer consumer, Collection<TopicPartition> topicPartitions) { + StandardEvaluationContext mapContext = new StandardEvaluationContext(); + mapContext.setVariable("consumer", consumer); + mapContext.setVariable("tp", topicPartitions); + assignExpression.getValue(mapContext); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ab850aca/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 80a0eb7..5fd34b9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -98,7 +98,9 @@ import org.slf4j.LoggerFactory; /** * An unbounded source and a sink for <a href="http://kafka.apache.org/">Kafka</a> topics. - * Kafka version 0.9 and above are supported. + * Kafka version 0.9 and 0.10 are supported. If you need a specific version of Kafka + * client(e.g. 0.9 for 0.9 servers, or 0.10 for security features), specify explicit + * kafka-client dependency. * * <h3>Reading from Kafka topics</h3> * @@ -212,7 +214,7 @@ public class KafkaIO { .setTopicPartitions(new ArrayList<TopicPartition>()) .setKeyCoder(ByteArrayCoder.of()) .setValueCoder(ByteArrayCoder.of()) - .setConsumerFactoryFn(Read.KAFKA_9_CONSUMER_FACTORY_FN) + .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) .setMaxNumRecords(Long.MAX_VALUE) .build(); @@ -228,7 +230,7 @@ public class KafkaIO { return new AutoValue_KafkaIO_Read.Builder<K, V>() .setTopics(new ArrayList<String>()) .setTopicPartitions(new ArrayList<TopicPartition>()) - .setConsumerFactoryFn(Read.KAFKA_9_CONSUMER_FACTORY_FN) + .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) .setMaxNumRecords(Long.MAX_VALUE) .build(); @@ -492,7 +494,7 @@ public class KafkaIO { // default Kafka 0.9 Consumer supplier. private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> - KAFKA_9_CONSUMER_FACTORY_FN = + KAFKA_CONSUMER_FACTORY_FN = new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() { public Consumer<byte[], byte[]> apply(Map<String, Object> config) { return new KafkaConsumer<>(config); @@ -712,6 +714,9 @@ public class KafkaIO { private static final long UNINITIALIZED_OFFSET = -1; + //Add SpEL instance to cover the interface difference of Kafka client + private transient ConsumerSpEL consumerSpEL; + /** watermark before any records have been read. */ private static Instant initialWatermark = new Instant(Long.MIN_VALUE); @@ -851,9 +856,10 @@ public class KafkaIO { @Override public boolean start() throws IOException { + this.consumerSpEL = new ConsumerSpEL(); Read<K, V> spec = source.spec; consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig()); - consumer.assign(spec.getTopicPartitions()); + consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions()); for (PartitionState p : partitionStates) { if (p.nextOffset != UNINITIALIZED_OFFSET) { @@ -889,7 +895,7 @@ public class KafkaIO { offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig); - offsetConsumer.assign(spec.getTopicPartitions()); + consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions()); offsetFetcherThread.scheduleAtFixedRate( new Runnable() { @@ -987,7 +993,7 @@ public class KafkaIO { private void updateLatestOffsets() { for (PartitionState p : partitionStates) { try { - offsetConsumer.seekToEnd(p.topicPartition); + consumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition); long offset = offsetConsumer.position(p.topicPartition); p.setLatestOffset(offset); } catch (Exception e) {