Github user tweise commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/134#discussion_r48646883
--- Diff:
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
---
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import com.google.common.base.Joiner;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+
+/**
+ * The abstract kafka input operator using kafka 0.9.0 new consumer API
+ * A scalable, fault-tolerant, at-least-once kafka input operator
+ * Key features includes:
+ * 1. Out-of-box One-to-one and one-to-many partition strategy support
plus customizable partition strategy
+ * refer to AbstractKafkaPartitioner
+ * 2. Fault-tolerant when the input operator goes down, it redeploys on
other node
+ * 3. At-least-once semantics for operator failure (no matter which
operator fails)
+ * 4. At-least-once semantics for cold restart (no data loss even if you
restart the application)
+ * 5. Multi-cluster support, one operator can consume data from more than
one kafka clusters
+ * 6. Multi-topic support, one operator can subscribe multiple topics
+ * 7. Throughput control support, you can throttle number of tuple for
each streaming window
+ */
+public abstract class AbstractKafkaInputOperator<K, V> implements
InputOperator, Operator.ActivationListener<Context.OperatorContext>,
Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>,
StatsListener, OffsetCommitCallback
+{
+
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
+
+ public long getMetricsRefreshInterval()
+ {
+ return metricsRefreshInterval;
+ }
+
+ public void setMetricsRefreshInterval(long metricsRefreshInterval)
+ {
+ this.metricsRefreshInterval = metricsRefreshInterval;
+ }
+
+ public enum InitialOffset
+ {
+ EARLIEST, // consume from beginning of the partition every time when
application restart
+ LATEST, // consume from latest of the partition every time when
application restart
+ APPLICATION_OR_EARLIEST, // consume from committed position from last
run or earliest if there is no committed offset(s)
+ APPLICATION_OR_LATEST // consume from committed position from last run
or latest if there is no committed offset(s)
+ }
+ /**
+ * Same setting as bootstrap.servers property to KafkaConsumer
+ * refer to
http://kafka.apache.org/documentation.html#newconsumerconfigs
+ * To support multi cluster, you can have multiple bootstrap.servers
separated by ";"
+ */
+ @NotNull
+ private String[] clusters;
+
+ /**
+ * The topics the operator consumes
+ */
+ @NotNull
+ private String[] topics;
+
+ /**
+ * Wrapper consumer object
+ * It wraps KafkaConsumer, maintains consumer thread and store messages
in a queue
+ */
+ private KafkaConsumerWrapper consumerWrapper = new
KafkaConsumerWrapper();
+
+ /**
+ * Assignment for each operator instance
+ */
+ private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
+
+ /**
+ * offset track for checkpoint
+ */
+ private Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack =
new HashMap<>();
+
+ /**
+ * store offsets with window id, only keep offsets with windows that
have not been committed
+ */
+ private transient List<Pair<Long,
Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new
LinkedList<>();
+
+ /**
+ * initial partition count
+ * only used with PartitionStrategy.ONE_TO_MANY
+ */
+ private int initialPartitionCount = 1;
+
+ // Minimal interval between 2 (re)partition actions
+ private long repartitionInterval = 30000L;
+
+ // Minimal interval between checking collected stats and decide whether
it needs to repartition or not.
+ // And minimal interval between 2 offset updates
+ private long repartitionCheckInterval = 5000L;
+
+ // class name of the key deserializer
+ protected String keyDeserializer;
+
+ // class name of the value deserializer
+ protected String valueDeserializer;
+
+ /**
+ * maximum tuples allowed to be emitted in each window
+ */
+ @Min(1)
+ private int maxTuplesPerWindow = Integer.MAX_VALUE;
+
+ private InitialOffset initialOffset =
InitialOffset.APPLICATION_OR_LATEST;
+
+ private long metricsRefreshInterval = 5000L;
+
+ /**
+ * extra kafka consumer properties
+ * http://kafka.apache.org/090/documentation.html#newconsumerconfigs
+ *
+ * Please be aware that the properties below are set by the operator,
don't override it
+ *
+ * bootstrap.servers
+ * group.id
+ * auto.offset.reset
+ * enable.auto.commit
+ * partition.assignment.strategy
+ *
+ *
+ */
+ private Properties consumerProps;
+
+ /**
+ * count the emitted message in each window
+ * non settable
+ */
+ private transient int emitCount = 0;
+
+ /**
+ * Application name is used as group.id for kafka consumer
+ */
+ private transient String applicationName;
+
+ private transient AbstractKafkaPartitioner partitioner;
+
+ protected transient long currentWindowId;
+
+ // By default the partition policy is 1:1
+ private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
+
+ private transient long lastCheckTime = 0L;
+
+ private transient long lastRepartitionTime = 0L;
+
+ @AutoMetric
+ private transient KafkaMetrics metrics;
+
+ private transient Deserializer<K> keyDeser;
+
+ private transient Deserializer<V> valueDeser;
+
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ consumerWrapper.start();
+ }
+
+ @Override
+ public void deactivate()
+ {
+ consumerWrapper.stop();
+ }
+
+ @Override
+ public void checkpointed(long l)
+ {
+
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ //ask kafka consumer wrapper to store the committed offsets
+ for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta,
Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
+ Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item =
iter.next();
+ if (item.getLeft() <= windowId) {
+ if (item.getLeft() == windowId) {
+ consumerWrapper.commitOffsets(item.getRight());
+ }
+ iter.remove();
+ }
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ int count = consumerWrapper.messageSize();
+ if (maxTuplesPerWindow > 0) {
+ count = Math.min(count, maxTuplesPerWindow - emitCount);
+ }
+ for (int i = 0; i < count; i++) {
+ Pair<String, ConsumerRecord<byte[], byte[]>> tuple =
consumerWrapper.pollMessage();
+ ConsumerRecord<byte[], byte[]> msg = tuple.getRight();
+ emitTuple(tuple.getLeft(), msg.topic(), msg.partition(),
msg.offset(),
+ keyDeser.deserialize(msg.topic(), msg.key()),
valueDeser.deserialize(msg.topic(), msg.value()));
+ AbstractKafkaPartitioner.PartitionMeta pm = new
AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
+ msg.topic(), msg.partition());
+ offsetTrack.put(pm, msg.offset());
+ }
+ emitCount += count;
+ }
+
+ protected abstract void emitTuple(String cluster, String topic, long
partition, long offset, K key, V value);
+
+ @Override
+ public void beginWindow(long wid)
+ {
+ emitCount = 0;
+ currentWindowId = wid;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ // copy current offset track to history memory
+ Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsWithWindow =
new HashMap<>(offsetTrack);
+ offsetHistory.add(Pair.of(currentWindowId, offsetsWithWindow));
+
+ //update metrics
+ metrics.updateMetrics(clusters,
consumerWrapper.getAllConsumerMetrics());
+ }
+
+
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ applicationName =
context.getValue(Context.DAGContext.APPLICATION_NAME);
+ consumerWrapper.create(this);
+ metrics = new KafkaMetrics(metricsRefreshInterval);
+ initDeserializer();
+ }
+
+ private void initDeserializer()
+ {
+ Map<String, Object> extraProeprties = new HashMap<>();
+ if (consumerProps != null) {
+ for (Map.Entry<Object, Object> e : consumerProps.entrySet()) {
+ extraProeprties.put(e.getKey().toString(), e.getValue());
+ }
+ }
+ try {
+ String cname = keyDeserializer;
+ if (cname == null) {
+ cname =
consumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ }
+ keyDeser = (Deserializer<K>)Class.forName(cname).newInstance();
+ keyDeser.configure(extraProeprties, true);
+ cname = valueDeserializer;
+ if (cname == null) {
+ cname =
consumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ }
+ valueDeser = (Deserializer<V>)Class.forName(cname).newInstance();
+ valueDeser.configure(extraProeprties, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ private void initPartitioner()
+ {
+ if (partitioner == null) {
+ logger.info("Initialize Partitioner");
+ switch (strategy) {
+ case ONE_TO_ONE:
+ partitioner = new OneToOnePartitioner(clusters, topics, this);
+ break;
+ case ONE_TO_MANY:
+ partitioner = new OneToManyPartitioner(clusters, topics, this);
+ break;
+ case ONE_TO_MANY_HEURISTIC:
+ throw new UnsupportedOperationException("Not implemented yet");
+ default:
+ throw new RuntimeException("Invalid strategy");
+ }
+ logger.info("Actual Partitioner is {}", partitioner.getClass());
+ }
+
+ }
+
+ @Override
+ public Response processStats(BatchedOperatorStats batchedOperatorStats)
+ {
+ long t = System.currentTimeMillis();
+ if (repartitionInterval < 0 || repartitionCheckInterval < 0 ||
+ t - lastCheckTime < repartitionCheckInterval || t -
lastRepartitionTime < repartitionInterval) {
+ // return false if it's within repartitionCheckInterval since last
time it check the stats
+ Response response = new Response();
+ response.repartitionRequired = false;
+ return response;
+ }
+
+ try {
+ logger.debug("Process stats");
+ initPartitioner();
+ return partitioner.processStats(batchedOperatorStats);
+ } finally {
+ lastCheckTime = System.currentTimeMillis();
+ }
+ }
+
+ @Override
+ public Collection<Partition<AbstractKafkaInputOperator>>
definePartitions(
+ Collection<Partition<AbstractKafkaInputOperator>> collection,
PartitioningContext partitioningContext)
+ {
+ logger.debug("Define partitions");
+ initPartitioner();
+ return partitioner.definePartitions(collection, partitioningContext);
+ }
+
+ @Override
+ public void partitioned(Map<Integer,
Partition<AbstractKafkaInputOperator>> map)
+ {
+ // update the last repartition time
+ lastRepartitionTime = System.currentTimeMillis();
+ initPartitioner();
+ partitioner.partitioned(map);
+ }
+
+ /**
+ *
+ * A callback from consumer after it commits the offset
+ * @param map
+ * @param e
+ */
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> map,
Exception e)
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Commit offsets complete {} ",
Joiner.on(';').withKeyValueSeparator("=").join(map));
+ }
+ if (e != null) {
+ logger.warn("Exceptions in committing offsets {} : {} ",
+ Joiner.on(';').withKeyValueSeparator("=").join(map), e);
+ }
+ }
+
+
+
+ //---------------------------------------------setters and
getters----------------------------------------
+ public void setInitialPartitionCount(int partitionCount)
+ {
+ this.initialPartitionCount = partitionCount;
+ }
+
+ public int getInitialPartitionCount()
+ {
+ return initialPartitionCount;
+ }
+
+ public void setClusters(String clusters)
+ {
+ this.clusters = clusters.split(";");
+ }
+
+ public String getClusters()
+ {
+ return Joiner.on(';').join(clusters);
+ }
+
+ public void setTopics(String... topics)
+ {
+ this.topics = topics;
+ }
+
+ public String[] getTopics()
+ {
+ return topics;
+ }
+
+ public void setConsumerWrapper(KafkaConsumerWrapper consumerWrapper)
--- End diff --
Why expose this as property?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---