[
https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15345817#comment-15345817
]
ASF GitHub Bot commented on APEXMALHAR-2086:
--------------------------------------------
Github user siyuanh commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/298#discussion_r68179058
--- Diff:
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
---
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics under
certain conditions.,
+ *
+ * This operator uses *Key* to distinguish the messages written by
particular instance of the Output operator.
+ * Operator users can only use *value* for storing the data.
+ *
+ * @displayName Single Port Exactly Once Kafka Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ */
[email protected]
+public class KafkaSinglePortExactlyOnceOutputOperator<T> extends
AbstractKafkaOutputOperator<String, T>
+ implements Operator.CheckpointNotificationListener
+{
+ private transient String key;
+ private transient String appName;
+ private transient Integer operatorId;
+ private transient Long windowId;
+ private transient Map<T, Integer> partialWindowTuples = new HashMap<>();
+ private transient KafkaConsumer consumer;
+
+ private WindowDataManager windowDataManager = new FSWindowDataManager();
+ private final int KAFKA_CONNECT_ATTEMPT = 10;
+ private final String KEY_SEPARATOR = "#";
+ private final String KEY_SERIALIZER =
"org.apache.kafka.common.serialization.StringDeserializer";
+ private final String VALUE_SERIALIZER =
"org.apache.kafka.common.serialization.StringDeserializer";
+
+ public final transient DefaultInputPort<T> inputPort = new
DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ sendTuple(tuple);
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+
+ this.operatorId = context.getId();
+ this.windowDataManager.setup(context);
+ this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+ this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+ this.consumer = KafkaConsumerInit();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ this.windowId = windowId;
+
+ if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+ rebuildPartialWindow();
+ }
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ windowDataManager.deleteUpTo(operatorId, windowId);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ consumer.close();
+ super.teardown();
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (!partialWindowTuples.isEmpty() && windowId >
windowDataManager.getLargestRecoveryWindow()) {
+ throw new RuntimeException("Violates Exactly once. Not all the
tuples received after operator reset.");
+ }
+
+ try {
+ this.windowDataManager.save(getPartitionsAndOffsets(), operatorId,
windowId);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public WindowDataManager getWindowDataManager()
+ {
+ return windowDataManager;
+ }
+
+ public void setWindowDataManager(WindowDataManager windowDataManager)
+ {
+ this.windowDataManager = windowDataManager;
+ }
+
+ private boolean doesKeyBelongsToThisInstance(Integer operatorId, String
key)
+ {
+ String[] split = key.split(KEY_SEPARATOR);
+
+ if (split.length != 2) {
+ return false;
+ }
+
+ if ((Integer.parseInt(split[1]) == operatorId) &&
(split[0].equals(appName))) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean alreadyInKafka(T message)
+ {
+ if ( windowId <= windowDataManager.getLargestRecoveryWindow() ) {
+ return true;
+ }
+
+ if (partialWindowTuples.containsKey(message)) {
+
+ Integer val = partialWindowTuples.get(message);
+
+ if ( val == 0 ) {
+ return false;
+ } else if ( val == 1 ) {
+ partialWindowTuples.remove(message);
+ } else {
+ partialWindowTuples.put(message, val - 1);
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ private Map<Integer,Long> getPartitionsAndOffsets() throws
ExecutionException, InterruptedException
+ {
+ List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(getTopic());
+ List<TopicPartition> topicPartitionList = new java.util.ArrayList<>();
+
+ for ( PartitionInfo partitionInfo: partitionInfoList) {
+ topicPartitionList.add(new TopicPartition(getTopic(),
partitionInfo.partition()) );
+ }
+
+ Map<Integer,Long> parttionsAndOffset = new HashMap<>();
+ consumer.assign(topicPartitionList);
+
+ for ( PartitionInfo partitionInfo: partitionInfoList) {
+
+ try {
+
+ TopicPartition topicPartition = new TopicPartition(getTopic(),
partitionInfo.partition());
+ consumer.seekToEnd(topicPartition);
+ parttionsAndOffset.put(partitionInfo.partition(),
consumer.position(topicPartition));
+
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return parttionsAndOffset;
+ }
+
+ private void rebuildPartialWindow()
+ {
+ logger.info("Rebuild the partial window after " +
windowDataManager.getLargestRecoveryWindow());
+
+ Map<Integer,Long> storedOffsets = getStoredOffsets();
+ Map<Integer,Long> currentOffsets = getCurrentOffsets();
+
+ if (storedOffsets == null || currentOffsets == null) {
+ logger.debug("No tuples found while building partial window " +
windowDataManager.getLargestRecoveryWindow());
+ return;
+ }
+
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+
+ for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+
+ topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
+ }
+
+ consumer.assign(topicPartitions);
+
+ for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+
+ Long storedOffset = 0L;
+ Integer currentPartition = entry.getKey();
+ Long currentOffset = entry.getValue();
+
+ if (storedOffsets.containsKey(currentPartition)) {
+ storedOffset = storedOffsets.get(currentPartition);
+ }
+
+ if (storedOffset >= currentOffset) {
+ continue;
+ }
+
+ consumer.seek(new TopicPartition(getTopic(), currentPartition),
storedOffset);
+
+ int kafkaAttempt = 0;
+
+ while ( true ) {
+
+ ConsumerRecords<String, String> consumerRecords =
consumer.poll(100);
+
+ if (consumerRecords.count() == 0) {
+ if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) {
+ break;
+ }
+ } else {
+ kafkaAttempt = 0;
+ }
+
+ boolean crossedBoundary = false;
+
+ for (ConsumerRecord consumerRecord : consumerRecords) {
+
+ if (!doesKeyBelongsToThisInstance(operatorId,
(String)consumerRecord.key())) {
+ continue;
+ }
+
+ T value = (T)consumerRecord.value();
+
+ if ( partialWindowTuples.containsKey(value)) {
+ Integer count = partialWindowTuples.get(value);
+ partialWindowTuples.put(value, count + 1);
+ } else {
+ partialWindowTuples.put(value, 1);
+ }
+
+ if (consumerRecord.offset() >= currentOffset) {
+ crossedBoundary = true;
+ break;
+ }
+ }
+
+ if (crossedBoundary) {
+ break;
+ }
+ }
+ }
+ }
+
+ private Map<Integer,Long> getCurrentOffsets()
+ {
+ Map<Integer, Long> currentOffsets;
+
+ try {
+ currentOffsets = getPartitionsAndOffsets();
+ } catch (ExecutionException e) {
--- End diff --
Group into one catch?
> Kafka Output Operator with Kafka 0.9 API
> ----------------------------------------
>
> Key: APEXMALHAR-2086
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Sandesh
> Assignee: Sandesh
>
> Goal : 2 Operartors for Kafka Output
> 1. Simple Kafka Output Operator
> - Supports Atleast Once
> - Expose most used producer properties as class properties
> 2. Exactly Once Kafka Output ( Not possible in all the cases, will be
> documented later )
>
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following
> approach:
> Tuples between the largest recovery offsets and the current offset are
> checked. Based on the key, tuples written by the other entities are
> discarded.
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)