Repository: kafka
Updated Branches:
  refs/heads/trunk bf7fb6321 -> c24740c7b


http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
new file mode 100644
index 0000000..0548fb4
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+/**
+ * TODO: Clean this after the consumer implementation is complete. Until then, 
it is useful to write some sample test code using the new APIs
+ *
+ */
+public class ConsumerExampleTest {
+    /**
+     * This example demonstrates how to use the consumer to leverage Kafka's 
group management functionality for automatic consumer load 
+     * balancing and failure detection. This example assumes that the offsets 
are stored in Kafka and are automatically committed periodically, 
+     * as controlled by the auto.commit.interval.ms config 
+     */
+//    @Test
+//    public void testConsumerGroupManagementWithAutoOffsetCommits() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        props.put("group.id", "test");
+//        props.put("session.timeout.ms", "1000");
+//        props.put("auto.commit.enable", "true");
+//        props.put("auto.commit.interval.ms", "10000");
+//        KafkaConsumer consumer = new KafkaConsumer(props);
+//        // subscribe to some topics
+//        consumer.subscribe("foo", "bar");
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            process(records);
+//        }
+//        consumer.close();
+//    }
+
+    /**
+     * This example demonstrates how to use the consumer to leverage Kafka's 
group management functionality for automatic consumer load 
+     * balancing and failure detection. This example assumes that the offsets 
are stored in Kafka and are manually committed using 
+     * either the commit() or commitAsync() APIs. This example also 
demonstrates rewinding the consumer's offsets if processing of consumed
+     * messages fails.
+     */
+//     @Test
+//     public void testConsumerGroupManagementWithManualOffsetCommit() {
+//         Properties props = new Properties();
+//         props.put("metadata.broker.list", "localhost:9092");
+//         props.put("group.id", "test");
+//         props.put("session.timeout.ms", "1000");
+//         props.put("auto.commit.enable", "false");
+//         KafkaConsumer consumer = new KafkaConsumer(props);
+//         // subscribe to some topics
+//         consumer.subscribe("foo", "bar");
+//         int commitInterval = 100;
+//         int numRecords = 0;
+//         boolean isRunning = true;
+//         Map<TopicPartition, Long> consumedOffsets = new 
HashMap<TopicPartition, Long>();
+//         while(isRunning) {
+//             Map<String, ConsumerRecords> records = consumer.poll(100);
+//             try {
+//                 Map<TopicPartition, Long> lastConsumedOffsets = 
process(records);
+//                 consumedOffsets.putAll(lastConsumedOffsets);
+//                 numRecords += records.size();
+//                 // commit offsets for all partitions of topics foo, bar 
synchronously, owned by this consumer instance
+//                 if(numRecords % commitInterval == 0) 
+//                     consumer.commit(true);
+//             } catch(Exception e) {
+//                 // rewind consumer's offsets for failed partitions
+//                 List<TopicPartition> failedPartitions = 
getFailedPartitions();
+//                 Map<TopicPartition, Long> offsetsToRewindTo = new 
HashMap<TopicPartition, Long>();
+//                 for(TopicPartition failedPartition : failedPartitions) {
+//                     // rewind to the last consumed offset for the failed 
partition. Since process() failed for this partition, the consumed offset
+//                     // should still be pointing to the last successfully 
processed offset and hence is the right offset to rewind consumption to.
+//                     offsetsToRewindTo.put(failedPartition, 
consumedOffsets.get(failedPartition));
+//                 }
+//                 // seek to new offsets only for partitions that failed the 
last process()
+//                 consumer.seek(offsetsToRewindTo);
+//             }
+//         }         
+//         consumer.close();
+//     }
+
+     private List<TopicPartition> getFailedPartitions() { return null; }
+
+    /**
+     * This example demonstrates the consumer can be used to leverage Kafka's 
group management functionality along with custom offset storage. 
+     * In this example, the assumption made is that the user chooses to store 
the consumer offsets outside Kafka. This requires the user to 
+     * plugin logic for retrieving the offsets from a custom store and provide 
the offsets to the consumer in the ConsumerRebalanceCallback
+     * callback. The onPartitionsAssigned callback is invoked after the 
consumer is assigned a new set of partitions on rebalance <i>and</i>
+     * before the consumption restarts post rebalance. This is the right place 
to supply offsets from a custom store to the consumer.
+     */
+//    @Test
+//    public void testConsumerRebalanceWithCustomOffsetStore() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        props.put("group.id", "test");
+//        props.put("session.timeout.ms", "1000");
+//        props.put("auto.commit.enable", "true");
+//        props.put("auto.commit.interval.ms", "10000");
+//        KafkaConsumer consumer = new KafkaConsumer(props,
+//                                                   new 
ConsumerRebalanceCallback() {
+//                                                        public void 
onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+//                                                           
Map<TopicPartition, Long> lastCommittedOffsets = 
getLastCommittedOffsetsFromCustomStore(partitions);
+//                                                           
consumer.seek(lastCommittedOffsets);
+//                                                        }
+//                                                        public void 
onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+//                                                            
Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions); // 
implemented by the user
+//                                                           
commitOffsetsToCustomStore(offsets);                                            
// implemented by the user
+//                                                        }
+//                                                        private 
Map<TopicPartition, Long> 
getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) {
+//                                                            return null;
+//                                                        }
+//                                                        private 
Map<TopicPartition, Long> getLastConsumedOffsets(Collection<TopicPartition> 
partitions) { return null; }
+//                                                        private void 
commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
+//                                                    });
+//        // subscribe to topics
+//        consumer.subscribe("foo", "bar");
+//        int commitInterval = 100;
+//        int numRecords = 0;
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            Map<TopicPartition, Long> consumedOffsets = process(records);
+//            numRecords += records.size();
+//            // commit offsets for all partitions of topics foo, bar 
synchronously, owned by this consumer instance
+//            if(numRecords % commitInterval == 0) 
+//                commitOffsetsToCustomStore(consumedOffsets);
+//        }
+//        consumer.close();
+//    }
+
+    /**
+     * This example demonstrates how the consumer can be used to leverage 
Kafka's group management functionality along with Kafka based offset storage. 
+     * In this example, the assumption made is that the user chooses to use 
Kafka based offset management. 
+     */
+//    @Test
+//    public void testConsumerRewindWithGroupManagementAndKafkaOffsetStorage() 
{
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        props.put("group.id", "test");
+//        props.put("session.timeout.ms", "1000");
+//        props.put("auto.commit.enable", "false");        
+//        KafkaConsumer consumer = new KafkaConsumer(props,
+//                                                   new 
ConsumerRebalanceCallback() {
+//                                                        boolean 
rewindOffsets = true;
+//                                                        public void 
onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+//                                                            
if(rewindOffsets) {
+//                                                            
Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(null);
+//                                                            
Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 
100);
+//                                                            
consumer.seek(newOffsets);
+//                                                            }
+//                                                        }
+//                                                        public void 
onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+//                                                            
consumer.commit(true);
+//                                                        }
+//                                                        // this API rewinds 
every partition back by numberOfMessagesToRewindBackTo messages 
+//                                                        private 
Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> 
currentOffsets,
+//                                                                             
                           long numberOfMessagesToRewindBackTo) {
+//                                                            
Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
+//                                                            
for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet()) {
+//                                                                
newOffsets.put(offset.getKey(), offset.getValue() - 
numberOfMessagesToRewindBackTo);
+//                                                            }
+//                                                            return 
newOffsets;
+//                                                        }
+//                                                    });
+//        // subscribe to topics
+//        consumer.subscribe("foo", "bar");
+//        int commitInterval = 100;
+//        int numRecords = 0;
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            Map<TopicPartition, Long> consumedOffsets = process(records);
+//            numRecords += records.size();
+//            // commit offsets for all partitions of topics foo, bar 
synchronously, owned by this consumer instance
+//            if(numRecords % commitInterval == 0) 
+//                commitOffsetsToCustomStore(consumedOffsets);
+//        }
+//        consumer.close();
+//    }
+
+    /**
+     * This example demonstrates how the consumer can be used to subscribe to 
specific partitions of certain topics and consume upto the latest
+     * available message for each of those partitions before shutting down. 
When used to subscribe to specific partitions, the user foregoes 
+     * the group management functionality and instead relies on manually 
configuring the consumer instances to subscribe to a set of partitions.
+     * This example assumes that the user chooses to use Kafka based offset 
storage. The user still has to specify a group.id to use Kafka 
+     * based offset management. However, session.timeout.ms is not required 
since the Kafka consumer only does failure detection with group 
+     * management.
+     */
+//    @Test
+//    public void testConsumerWithKafkaBasedOffsetManagement() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        props.put("group.id", "test");
+//        props.put("auto.commit.enable", "true");
+//        props.put("auto.commit.interval.ms", "10000");
+//        KafkaConsumer consumer = new KafkaConsumer(props);
+//        // subscribe to some partitions of topic foo
+//        TopicPartition partition0 = new TopicPartition("foo", 0);
+//        TopicPartition partition1 = new TopicPartition("foo", 1);
+//        TopicPartition[] partitions = new TopicPartition[2];
+//        partitions[0] = partition0;
+//        partitions[1] = partition1;
+//        consumer.subscribe(partitions);
+//        // find the last committed offsets for partitions 0,1 of topic foo
+//        Map<TopicPartition, Long> lastCommittedOffsets = 
consumer.committed(null);
+//        // seek to the last committed offsets to avoid duplicates
+//        consumer.seek(lastCommittedOffsets);        
+//        // find the offsets of the latest available messages to know where 
to stop consumption
+//        Map<TopicPartition, Long> latestAvailableOffsets = 
consumer.offsetsBeforeTime(-2, null);
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            Map<TopicPartition, Long> consumedOffsets = process(records);
+//            for(TopicPartition partition : partitions) {
+//                if(consumedOffsets.get(partition) >= 
latestAvailableOffsets.get(partition))
+//                    isRunning = false;
+//                else
+//                    isRunning = true;
+//            }
+//        }
+//        consumer.close();
+//    }
+
+    /**
+     * This example demonstrates how the consumer can be used to subscribe to 
specific partitions of certain topics and consume upto the latest
+     * available message for each of those partitions before shutting down. 
When used to subscribe to specific partitions, the user foregoes 
+     * the group management functionality and instead relies on manually 
configuring the consumer instances to subscribe to a set of partitions.
+     * This example assumes that the user chooses to use custom offset storage.
+     */
+    @Test
+    public void testConsumerWithCustomOffsetManagement() {
+//        Properties props = new Properties();
+//        props.put("metadata.broker.list", "localhost:9092");
+//        KafkaConsumer consumer = new KafkaConsumer(props);
+//        // subscribe to some partitions of topic foo
+//        TopicPartition partition0 = new TopicPartition("foo", 0);
+//        TopicPartition partition1 = new TopicPartition("foo", 1);
+//        TopicPartition[] partitions = new TopicPartition[2];
+//        partitions[0] = partition0;
+//        partitions[1] = partition1;
+//        consumer.subscribe(partitions);
+//        Map<TopicPartition, Long> lastCommittedOffsets = 
getLastCommittedOffsetsFromCustomStore();
+//        // seek to the last committed offsets to avoid duplicates
+//        consumer.seek(lastCommittedOffsets);        
+//        // find the offsets of the latest available messages to know where 
to stop consumption
+//        Map<TopicPartition, Long> latestAvailableOffsets = 
consumer.offsetsBeforeTime(-2, null);
+//        boolean isRunning = true;
+//        while(isRunning) {
+//            Map<String, ConsumerRecords> records = consumer.poll(100);
+//            Map<TopicPartition, Long> consumedOffsets = process(records);
+//            // commit offsets for partitions 0,1 for topic foo to custom 
store
+//            commitOffsetsToCustomStore(consumedOffsets);
+//            for(TopicPartition partition : partitions) {
+//                if(consumedOffsets.get(partition) >= 
latestAvailableOffsets.get(partition))
+//                    isRunning = false;
+//                else
+//                    isRunning = true;
+//            }            
+//        }         
+//        consumer.close();
+    }
+
+    private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore() 
{ return null; }
+    private void commitOffsetsToCustomStore(Map<TopicPartition, Long> 
consumedOffsets) {}
+    private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> 
records) {
+        Map<TopicPartition, Long> processedOffsets = new 
HashMap<TopicPartition, Long>();
+        for(Entry<String, ConsumerRecords> recordMetadata : 
records.entrySet()) {
+            List<ConsumerRecord> recordsPerTopic = 
recordMetadata.getValue().records();
+            for(int i = 0;i < recordsPerTopic.size();i++) {
+                ConsumerRecord record = recordsPerTopic.get(i);
+                // process record
+                try {
+                    processedOffsets.put(record.topicAndPartition(), 
record.offset());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }                
+            }
+        }
+        return processedOffsets; 
+    }
+}

Reply via email to