[
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15321699#comment-15321699
]
ASF GitHub Bot commented on APEXMALHAR-2086:
--------------------------------------------
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/298#discussion_r66365356
--- Diff:
kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractExactlyOnceKafkaOutputOperator.java
---
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+// 1. JavaDoc
+// 2. Unit Test
+// 3. Comparator -- not required
+// 4. Generic Type
+// 5. remove e.printStackTrace()
+// 6. Should the Consumer be kept open?
+
+/**
+ * This is a base implementation of a Kafka output operator,
+ * which, in most cases, guarantees to send tuples to Kafka MQ only
once.
+ * Subclasses should implement the methods for converting tuples into a
format appropriate for Kafka.
+ * <p>
+ * Assuming messages kept in kafka are ordered by either key or value or
keyvalue pair
+ * (For example, use timestamps as key), this Kafka OutputOperator always
retrieve the last message from MQ as initial offset.
+ * So that replayed message wouldn't be sent to kafka again.
+ *
+ * This is not "perfect exact once" in 2 cases:
+ * 1 Multiple producers produce messages to same kafka partition
+ * 2 You have same message sent out and before kafka synchronized this
message among all the brokers, the operator is
+ * started again.
+ *
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: One input port<br>
+ * <b>Output</b>: No output port<br>
+ * <br>
+ * Properties:<br>
+ * configProperties<br>
+ * <br>
+ * Compile time checks:<br>
+ * Class derived from has to implement 2 methods:<br>
+ * tupleToKeyValue() to convert input tuples to kafka key value objects<br>
+ * compareToLastMsg() to compare incoming tuple with the last received msg
in kafka so that the operator could skip the received ones<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * </p>
+ *
+ * @displayName Abstract Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ * @since 3.5
+ */
+public abstract class AbstractExactlyOnceKafkaOutputOperator<T> extends
AbstractKafkaOutputOperator<String, T>
+ implements Operator.CheckpointNotificationListener
+{
+ private WindowDataManager windowDataManager = new FSWindowDataManager();
+ private String key;
+ private Integer operatorId;
+ private String appId;
+ private transient Long windowId;
+ private transient Set<T> recoveredTuples = new HashSet<>();
+ private int KAFKA_CONNECT_ATTEMPT = 10;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ this.operatorId = context.getId();
+ this.windowDataManager.setup(context);
+ this.appId = context.getValue(Context.DAGContext.APPLICATION_ID);
+ this.key = appId + '#' + (new Integer(operatorId));
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ this.windowId = windowId;
+
+ if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+ rebuildLastWindow();
+ }
+ }
+
+ // Only the tuples in the incomplete window needs to be read from the
Kafka.
+ // Re-sent tuples from the previous windows are not written to Kafka.
+ private void rebuildLastWindow()
+ {
+ recoveredTuples.clear();
+
+ Map<Integer,Long> storedOffsets = getStoredOffsets();
+ Map<Integer,Long> currentOffsets = getCurrentOffsets();
+
+ if (storedOffsets == null || currentOffsets == null) {
+ //TODO: Take some action
+ return;
+ }
+
+ KafkaConsumer consumer = KafkaConsumerInit();
+
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+
+ for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+
+ topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
+ }
+
+ consumer.assign(topicPartitions);
+
+ // From each partitions in a topic
+ // 1. Read the tuples between stored offset and the latest offset
+ // 2. Check the Key to filter out the messages
+ // 3. Store the recovered tuples in the HashSet
+
+ for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+
+ Long storedOffset = 0L;
+ Integer currentPartition = entry.getKey();
+ Long currentOffset = entry.getValue();
+
+ if (storedOffsets.containsKey(currentPartition)) {
+ storedOffset = storedOffsets.get(currentPartition);
+ }
+
+ if (storedOffset >= currentOffset) {
+ continue;
+ }
+
+ consumer.seek(new TopicPartition(getTopic(), currentPartition),
storedOffset);
+
+ int kafkaAttempt = 0;
+
+ while ( true ) {
+
+ ConsumerRecords<String, String> consumerRecords =
consumer.poll(100);
+
+ if (consumerRecords.count() == 0) {
+ ++kafkaAttempt;
+
+ if (kafkaAttempt == KAFKA_CONNECT_ATTEMPT) {
+ break;
+ }
+ } else {
+ kafkaAttempt = 0;
+ }
+
+ boolean crossedBoundary = false;
+
+ for (ConsumerRecord consumerRecord : consumerRecords) {
+
+ if ( !doesKeyBelongsThisInstance(operatorId,
(String)consumerRecord.key()) ) {
+ continue;
+ }
+
+ recoveredTuples.add((T)consumerRecord.value());
+
+ if ( consumerRecord.offset() >= currentOffset ) {
+ crossedBoundary = true;
+ break;
+ }
+ }
+
+ if ( crossedBoundary ) {
+ break;
+ }
+ }
+ }
+
+ consumer.close();
+ }
+
+ private Map<Integer,Long> getCurrentOffsets()
+ {
+ Map<Integer, Long> currentOffsets = null;
+
+ try {
+ currentOffsets = getPartitionsAndOffsets();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return currentOffsets;
+ }
+
+ private Map<Integer,Long> getStoredOffsets()
+ {
+ Map<Integer,Long> storedOffsets = null;
+ try {
+ storedOffsets =
(Map<Integer,Long>)this.windowDataManager.load(operatorId, windowId);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return storedOffsets;
+ }
+
+ private KafkaConsumer KafkaConsumerInit()
+ {
+ Properties props = new Properties();
+ props.put("bootstrap.servers",
getConfigProperties().get("bootstrap.servers"));
--- End diff --
Can you use Java Constants instead of hardcoding property strings?
You can find the public static string constants in
org.apache.kafka.clients.producer.ProducerConfig
> Kafka Output Operator with Kafka 0.9 API
> ----------------------------------------
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Sandesh
> Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
> 1. Simple Kafka Output Operator
> - Supports Atleast Once
> - Expose most used producer properties as class properties
> 2. Exactly Once Kafka Output ( Not possible in all the cases, will be
> documented later )
>
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following
> approach:
> Tuples between the largest recovery offsets and the current offset are
> checked. Based on the key, tuples written by the other entities are
> discarded.
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)