Github user tweise commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/134#discussion_r48075562
  
    --- Diff: 
kafka/src/main/java/com/datatorrent/contrib/kafka090/AbstractKafkaInputOperator.java
 ---
    @@ -0,0 +1,410 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka090;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.commons.lang3.tuple.Pair;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +
    +import com.google.common.base.Joiner;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.Partitioner;
    +import com.datatorrent.api.StatsListener;
    +
    +/**
    + * The abstract kafka input operator using kafka 0.9.0 new consumer API
    + * A scalable, fault-tolerant, at-least-once kafka input operator
    + * Keu features includes:
    + * 1. Out-of-box One-to-one and one-to-many partition schema support plus 
customizable partition schem
    + *    refer to AbstractKafkaPartitioner
    + * 2. Fault-tolerant when the input operator goes down, it redeploys on 
other node
    + * 3. At-least-once semantics for operator failure (no matter which 
operator fails)
    + * 4. At-least-once semantics for cold restart (no data loss even if you 
restart the application)
    + * 5. Multi-cluster support, one operator can consume data from more than 
one kafka clusters
    + * 6. Multi-topic support, one operator can subscribe multiple topics
    + * 7. Throughput control support, you can throttle number of tuple for 
each streaming window
    + */
    +public abstract class AbstractKafkaInputOperator<K, V> implements 
InputOperator, Operator.ActivationListener<Context.OperatorContext>, 
Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>, 
StatsListener
    +{
    +
    +
    +  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
    +
    +  public enum InitialOffset {
    +    EARLIEST, // consume from beginning of the partition every time when 
application restart
    +    LATEST, // consume from latest of the partition every time when 
application restart
    +    APPLICATION_OR_EARLIEST, // consume from committed position from last 
run or earliest if there is no committed offset(s)
    +    APPLICATION_OR_LATEST // consume from committed position from last run 
or latest if there is no committed offset(s)
    +  }
    +  /**
    +   *  node separate by ',' and clusters separate by ';'
    +   *  ex: c1n1,c1n2;c2n1,c2n2
    +   */
    +  @NotNull
    +  private String[] clusters;
    +
    +  /**
    +   * The topics the operator consumes
    +   */
    +  @NotNull
    +  private String[] topics;
    +
    +  /**
    +   * Wrapper consumer object
    +   * It wraps KafkaConsumer, maintains consumer thread and store messages 
in a queue
    +   */
    +  private KafkaConsumerWrapper<K, V> consumerWrapper = new 
KafkaConsumerWrapper<>();
    +
    +  /**
    +   * Assignment for each operator instance
    +   */
    +  private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
    +
    +  /**
    +   *  offset track for checkpoint
    +   */
    +  private Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = 
new HashMap<>();
    +
    +  /**
    +   * store offsets with window id, only keep offsets with windows that 
have not been committed
    +   */
    +  private transient List<Pair<Long, 
Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new 
LinkedList<>();
    +
    +  /**
    +   * initial partition count
    +   * only used with PartitionStrategy.ONE_TO_MANY
    +   */
    +  private int initialPartitionCount = 1;
    --- End diff --
    
    This does not work when it is the property of another property (with 
configuration, you cannot assume the order in which properties are set). If the 
partitioner won't be a property then it may work. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to