http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java deleted file mode 100644 index a38c3bd..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.internals; - -import kafka.producer.Partitioner; -import kafka.utils.VerifiableProperties; - -/** - * Hacky wrapper to send an object instance through a Properties - map. - * - * This works as follows: - * The recommended way of creating a KafkaSink is specifying a classname for the partitioner. - * - * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink. - * This is set in the key-value (java.util.Properties) map. - * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable). - * This is a hack because the put() method is called on the underlying Hashmap. - * - * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance. - * - * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there. - */ -public class PartitionerWrapper implements Partitioner { - public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized"; - - private Partitioner wrapped; - public PartitionerWrapper(VerifiableProperties properties) { - wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME); - } - - @Override - public int partition(Object value, int numberOfPartitions) { - return wrapped.partition(value, numberOfPartitions); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java new file mode 100644 index 0000000..6aaeca9 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.util.HashMap; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A thread that periodically writes the current Kafka partition offsets to Zookeeper. + */ +public class PeriodicOffsetCommitter extends Thread { + + /** The ZooKeeper handler */ + private final ZookeeperOffsetHandler offsetHandler; + + private final KafkaTopicPartitionState<?>[] partitionStates; + + /** The proxy to forward exceptions to the main thread */ + private final ExceptionProxy errorHandler; + + /** Interval in which to commit, in milliseconds */ + private final long commitInterval; + + /** Flag to mark the periodic committer as running */ + private volatile boolean running = true; + + PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler, + KafkaTopicPartitionState<?>[] partitionStates, + ExceptionProxy errorHandler, + long commitInterval) + { + this.offsetHandler = checkNotNull(offsetHandler); + this.partitionStates = checkNotNull(partitionStates); + this.errorHandler = checkNotNull(errorHandler); + this.commitInterval = commitInterval; + + checkArgument(commitInterval > 0); + } + + @Override + public void run() { + try { + while (running) { + Thread.sleep(commitInterval); + + // create copy a deep copy of the current offsets + HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(partitionStates.length); + for (KafkaTopicPartitionState<?> partitionState : partitionStates) { + currentOffsets.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset()); + } + + offsetHandler.writeOffsets(currentOffsets); + } + } + catch (Throwable t) { + if (running) { + errorHandler.reportError( + new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t)); + } + } + } + + public void shutdown() { + this.running = false; + this.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java new file mode 100644 index 0000000..491ffad --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; + +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.StringUtils; + +import org.apache.kafka.common.Node; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig; + +/** + * This class implements a thread with a connection to a single Kafka broker. The thread + * pulls records for a set of topic partitions for which the connected broker is currently + * the leader. The thread deserializes these records and emits them. + * + * @param <T> The type of elements that this consumer thread creates from Kafka's byte messages + * and emits into the Flink DataStream. + */ +class SimpleConsumerThread<T> extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class); + + private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER; + + // ------------------------------------------------------------------------ + + private final Kafka08Fetcher<T> owner; + + private final KeyedDeserializationSchema<T> deserializer; + + private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions; + + private final Node broker; + + /** Queue containing new fetch partitions for the consumer thread */ + private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue; + + private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions; + + private final ExceptionProxy errorHandler; + + private final long invalidOffsetBehavior; + + private volatile boolean running = true; + + + // ----------------- Simple Consumer ---------------------- + private volatile SimpleConsumer consumer; + + private final int soTimeout; + private final int minBytes; + private final int maxWait; + private final int fetchSize; + private final int bufferSize; + private final int reconnectLimit; + + + // exceptions are thrown locally + public SimpleConsumerThread( + Kafka08Fetcher<T> owner, + ExceptionProxy errorHandler, + Properties config, + Node broker, + List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, + ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions, + KeyedDeserializationSchema<T> deserializer, + long invalidOffsetBehavior) + { + this.owner = owner; + this.errorHandler = errorHandler; + this.broker = broker; + this.partitions = seedPartitions; + this.deserializer = requireNonNull(deserializer); + this.unassignedPartitions = requireNonNull(unassignedPartitions); + this.newPartitionsQueue = new ClosableBlockingQueue<>(); + this.invalidOffsetBehavior = invalidOffsetBehavior; + + // these are the actual configuration values of Kafka + their original default values. + this.soTimeout = getIntFromConfig(config, "socket.timeout.ms", 30000); + this.minBytes = getIntFromConfig(config, "fetch.min.bytes", 1); + this.maxWait = getIntFromConfig(config, "fetch.wait.max.ms", 100); + this.fetchSize = getIntFromConfig(config, "fetch.message.max.bytes", 1048576); + this.bufferSize = getIntFromConfig(config, "socket.receive.buffer.bytes", 65536); + this.reconnectLimit = getIntFromConfig(config, "flink.simple-consumer-reconnectLimit", 3); + } + + public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() { + return newPartitionsQueue; + } + + // ------------------------------------------------------------------------ + // main work loop + // ------------------------------------------------------------------------ + + @Override + public void run() { + LOG.info("Starting to fetch from {}", this.partitions); + + // set up the config values + final String clientId = "flink-kafka-consumer-legacy-" + broker.id(); + + try { + // create the Kafka consumer that we actually use for fetching + consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); + + // make sure that all partitions have some offsets to start with + // those partitions that do not have an offset from a checkpoint need to get + // their start offset from ZooKeeper + getMissingOffsetsFromKafka(partitions); + + // Now, the actual work starts :-) + int offsetOutOfRangeCount = 0; + int reconnects = 0; + while (running) { + + // ----------------------------------- partitions list maintenance ---------------------------- + + // check queue for new partitions to read from: + List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch(); + if (newPartitions != null) { + // found some new partitions for this thread's broker + + // check if the new partitions need an offset lookup + getMissingOffsetsFromKafka(newPartitions); + + // add the new partitions (and check they are not already in there) + for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) { + if (partitions.contains(newPartition)) { + throw new IllegalStateException("Adding partition " + newPartition + + " to subscribed partitions even though it is already subscribed"); + } + partitions.add(newPartition); + } + + LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName()); + LOG.debug("Partitions list: {}", newPartitions); + } + + if (partitions.size() == 0) { + if (newPartitionsQueue.close()) { + // close succeeded. Closing thread + running = false; + + LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", + getName()); + + // add the wake-up marker into the queue to make the main thread + // immediately wake up and termination faster + unassignedPartitions.add(MARKER); + + break; + } else { + // close failed: fetcher main thread concurrently added new partitions into the queue. + // go to top of loop again and get the new partitions + continue; + } + } + + // ----------------------------------- request / response with kafka ---------------------------- + + FetchRequestBuilder frb = new FetchRequestBuilder(); + frb.clientId(clientId); + frb.maxWait(maxWait); + frb.minBytes(minBytes); + + for (KafkaTopicPartitionState<?> partition : partitions) { + frb.addFetch( + partition.getKafkaTopicPartition().getTopic(), + partition.getKafkaTopicPartition().getPartition(), + partition.getOffset() + 1, // request the next record + fetchSize); + } + + kafka.api.FetchRequest fetchRequest = frb.build(); + LOG.debug("Issuing fetch request {}", fetchRequest); + + FetchResponse fetchResponse; + try { + fetchResponse = consumer.fetch(fetchRequest); + } + catch (Throwable cce) { + //noinspection ConstantConditions + if (cce instanceof ClosedChannelException) { + LOG.warn("Fetch failed because of ClosedChannelException."); + LOG.debug("Full exception", cce); + + // we don't know if the broker is overloaded or unavailable. + // retry a few times, then return ALL partitions for new leader lookup + if (++reconnects >= reconnectLimit) { + LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit); + for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) { + unassignedPartitions.add(fp); + } + this.partitions.clear(); + continue; // jump to top of loop: will close thread or subscribe to new partitions + } + try { + consumer.close(); + } catch (Throwable t) { + LOG.warn("Error while closing consumer connection", t); + } + // delay & retry + Thread.sleep(100); + consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); + continue; // retry + } else { + throw cce; + } + } + reconnects = 0; + + // ---------------------------------------- error handling ---------------------------- + + if (fetchResponse == null) { + throw new IOException("Fetch from Kafka failed (request returned null)"); + } + + if (fetchResponse.hasError()) { + String exception = ""; + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); + + // iterate over partitions to get individual error codes + Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator(); + boolean partitionsRemoved = false; + + while (partitionsIterator.hasNext()) { + final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next(); + short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition()); + + if (code == ErrorMapping.OffsetOutOfRangeCode()) { + // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper) + // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset' + partitionsToGetOffsetsFor.add(fp); + } + else if (code == ErrorMapping.NotLeaderForPartitionCode() || + code == ErrorMapping.LeaderNotAvailableCode() || + code == ErrorMapping.BrokerNotAvailableCode() || + code == ErrorMapping.UnknownCode()) + { + // the broker we are connected to is not the leader for the partition. + LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp); + LOG.debug("Error code = {}", code); + + unassignedPartitions.add(fp); + + partitionsIterator.remove(); // unsubscribe the partition ourselves + partitionsRemoved = true; + } + else if (code != ErrorMapping.NoError()) { + exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " + + StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); + } + } + if (partitionsToGetOffsetsFor.size() > 0) { + // safeguard against an infinite loop. + if (offsetOutOfRangeCount++ > 3) { + throw new RuntimeException("Found invalid offsets more than three times in partitions " + + partitionsToGetOffsetsFor + " Exceptions: " + exception); + } + // get valid offsets for these partitions and try again. + LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor); + getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); + + LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor); + continue; // jump back to create a new fetch request. The offset has not been touched. + } + else if (partitionsRemoved) { + continue; // create new fetch request + } + else { + // partitions failed on an error + throw new IOException("Error while fetching from broker '" + broker +"': " + exception); + } + } else { + // successful fetch, reset offsetOutOfRangeCount. + offsetOutOfRangeCount = 0; + } + + // ----------------------------------- process fetch response ---------------------------- + + int messagesInFetch = 0; + int deletedMessages = 0; + Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator(); + + partitionsLoop: + while (partitionsIterator.hasNext()) { + final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next(); + + final ByteBufferMessageSet messageSet = fetchResponse.messageSet( + currentPartition.getTopic(), currentPartition.getPartition()); + + for (MessageAndOffset msg : messageSet) { + if (running) { + messagesInFetch++; + final ByteBuffer payload = msg.message().payload(); + final long offset = msg.offset(); + + if (offset <= currentPartition.getOffset()) { + // we have seen this message already + LOG.info("Skipping message with offset " + msg.offset() + + " because we have seen messages until (including) " + + currentPartition.getOffset() + + " from topic/partition " + currentPartition.getTopic() + '/' + + currentPartition.getPartition() + " already"); + continue; + } + + // If the message value is null, this represents a delete command for the message key. + // Log this and pass it on to the client who might want to also receive delete messages. + byte[] valueBytes; + if (payload == null) { + deletedMessages++; + valueBytes = null; + } else { + valueBytes = new byte[payload.remaining()]; + payload.get(valueBytes); + } + + // put key into byte array + byte[] keyBytes = null; + int keySize = msg.message().keySize(); + + if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization + ByteBuffer keyPayload = msg.message().key(); + keyBytes = new byte[keySize]; + keyPayload.get(keyBytes); + } + + final T value = deserializer.deserialize(keyBytes, valueBytes, + currentPartition.getTopic(), currentPartition.getPartition(), offset); + + if (deserializer.isEndOfStream(value)) { + // remove partition from subscribed partitions. + partitionsIterator.remove(); + continue partitionsLoop; + } + + owner.emitRecord(value, currentPartition, offset); + } + else { + // no longer running + return; + } + } + } + LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages); + } // end of fetch loop + + if (!newPartitionsQueue.close()) { + throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue."); + } + } + catch (Throwable t) { + // report to the fetcher's error handler + errorHandler.reportError(t); + } + finally { + if (consumer != null) { + // closing the consumer should not fail the program + try { + consumer.close(); + } + catch (Throwable t) { + LOG.error("Error while closing the Kafka simple consumer", t); + } + } + } + } + + private void getMissingOffsetsFromKafka( + List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException + { + // collect which partitions we should fetch offsets for + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); + for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { + if (!part.isOffsetDefined()) { + // retrieve the offset from the consumer + partitionsToGetOffsetsFor.add(part); + } + } + + if (partitionsToGetOffsetsFor.size() > 0) { + getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); + + LOG.info("No checkpoint/savepoint offsets found for some partitions. " + + "Fetched the following start offsets {}", partitionsToGetOffsetsFor); + } + } + + /** + * Cancels this fetch thread. The thread will release all resources and terminate. + */ + public void cancel() { + this.running = false; + + // interrupt whatever the consumer is doing + if (consumer != null) { + consumer.close(); + } + + this.interrupt(); + } + + // ------------------------------------------------------------------------ + // Kafka Request Utils + // ------------------------------------------------------------------------ + + /** + * Request latest offsets for a set of partitions, via a Kafka consumer. + * + * <p>This method retries three times if the response has an error. + * + * @param consumer The consumer connected to lead broker + * @param partitions The list of partitions we need offsets for + * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest) + */ + private static void getLastOffsetFromKafka( + SimpleConsumer consumer, + List<KafkaTopicPartitionState<TopicAndPartition>> partitions, + long whichTime) throws IOException + { + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); + for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { + requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1)); + } + + int retries = 0; + OffsetResponse response; + while (true) { + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); + response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + StringBuilder exception = new StringBuilder(); + for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { + short code; + if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) { + exception.append("\nException for topic=").append(part.getTopic()) + .append(" partition=").append(part.getPartition()).append(": ") + .append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code))); + } + } + if (++retries >= 3) { + throw new IOException("Unable to get last offset for partitions " + partitions + ": " + + exception.toString()); + } else { + LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception); + } + } else { + break; // leave retry loop + } + } + + for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) { + final long offset = response.offsets(part.getTopic(), part.getPartition())[0]; + + // the offset returned is that of the next record to fetch. because our state reflects the latest + // successfully emitted record, we subtract one + part.setOffset(offset - 1); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java index 328cab0..a1a81ed 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -24,7 +24,6 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; @@ -39,11 +38,9 @@ import java.util.Properties; /** * Handler for committing Kafka offsets to Zookeeper and to retrieve them again. */ -public class ZookeeperOffsetHandler implements OffsetHandler { +public class ZookeeperOffsetHandler { private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class); - - private static final long OFFSET_NOT_SET = FlinkKafkaConsumerBase.OFFSET_NOT_SET; private final String groupId; @@ -74,27 +71,40 @@ public class ZookeeperOffsetHandler implements OffsetHandler { curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); curatorClient.start(); } + + // ------------------------------------------------------------------------ + // Offset access and manipulation + // ------------------------------------------------------------------------ - - @Override - public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception { - for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToCommit.entrySet()) { + /** + * Writes given set of offsets for Kafka partitions to ZooKeeper. + * + * @param offsetsToWrite The offsets for the partitions to write. + * @throws Exception The method forwards exceptions. + */ + public void writeOffsets(Map<KafkaTopicPartition, Long> offsetsToWrite) throws Exception { + for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToWrite.entrySet()) { KafkaTopicPartition tp = entry.getKey(); long offset = entry.getValue(); - + if (offset >= 0) { setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset); } } } - @Override + /** + * + * @param partitions The partitions to read offsets for. + * @return The mapping from partition to offset. + * @throws Exception This method forwards exceptions. + */ public Map<KafkaTopicPartition, Long> getOffsets(List<KafkaTopicPartition> partitions) throws Exception { Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size()); for (KafkaTopicPartition tp : partitions) { - long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition()); + Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition()); - if (offset != OFFSET_NOT_SET) { + if (offset != null) { LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.", tp.getTopic(), tp.getPartition(), offset); ret.put(tp, offset); @@ -103,7 +113,11 @@ public class ZookeeperOffsetHandler implements OffsetHandler { return ret; } - @Override + /** + * Closes the offset handler. + * + * @throws IOException Thrown, if the handler cannot be closed properly. + */ public void close() throws IOException { curatorClient.close(); } @@ -120,7 +134,7 @@ public class ZookeeperOffsetHandler implements OffsetHandler { curatorClient.setData().forPath(path, data); } - public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception { + public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception { ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); String path = topicDirs.consumerOffsetDir() + "/" + partition; curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); @@ -128,18 +142,20 @@ public class ZookeeperOffsetHandler implements OffsetHandler { byte[] data = curatorClient.getData().forPath(path); if (data == null) { - return OFFSET_NOT_SET; + return null; } else { String asString = new String(data); if (asString.length() == 0) { - return OFFSET_NOT_SET; + return null; } else { try { - return Long.parseLong(asString); - } catch (NumberFormatException e) { - throw new Exception(String.format( - "The offset in ZooKeeper for group '%s', topic '%s', partition %d is a malformed string: %s", - groupId, topic, partition, asString)); + return Long.valueOf(asString); + } + catch (NumberFormatException e) { + LOG.error( + "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}", + groupId, topic, partition, asString); + return null; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index d6ee968..0aef3bd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -22,7 +22,6 @@ import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -35,8 +34,11 @@ import org.junit.Test; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class Kafka08ITCase extends KafkaConsumerTestBase { @@ -45,11 +47,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { // ------------------------------------------------------------------------ @Test(timeout = 60000) - public void testCheckpointing() throws Exception { - runCheckpointingTest(); - } - - @Test(timeout = 60000) public void testFailOnNoBroker() throws Exception { runFailOnNoBrokerTest(); } @@ -60,15 +57,15 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { runSimpleConcurrentProducerConsumerTopology(); } - @Test(timeout = 60000) - public void testPunctuatedExplicitWMConsumer() throws Exception { - runExplicitPunctuatedWMgeneratingConsumerTest(false); - } +// @Test(timeout = 60000) +// public void testPunctuatedExplicitWMConsumer() throws Exception { +// runExplicitPunctuatedWMgeneratingConsumerTest(false); +// } - @Test(timeout = 60000) - public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { - runExplicitPunctuatedWMgeneratingConsumerTest(true); - } +// @Test(timeout = 60000) +// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { +// runExplicitPunctuatedWMgeneratingConsumerTest(true); +// } @Test(timeout = 60000) public void testKeyValueSupport() throws Exception { @@ -164,7 +161,31 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { runMetricsAndEndOfStreamTest(); } + @Test + public void runOffsetManipulationInZooKeeperTest() { + try { + final String topicName = "ZookeeperOffsetHandlerTest-Topic"; + final String groupId = "ZookeeperOffsetHandlerTest-Group"; + final Long offset = (long) (Math.random() * Long.MAX_VALUE); + + CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient(); + kafkaServer.createTestTopic(topicName, 3, 2); + + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset); + + Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0); + + curatorFramework.close(); + + assertEquals(offset, fetchedOffset); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + /** * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper. * @@ -202,15 +223,15 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0); - long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1); - long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2); + Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0); + Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1); + Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2); LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); - assertTrue(o1 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100)); - assertTrue(o2 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o2 >= 0 && o2 <= 100)); - assertTrue(o3 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o3 >= 0 && o3 <= 100)); + assertTrue(o1 == null || (o1 >= 0 && o1 <= 100)); + assertTrue(o2 == null || (o2 >= 0 && o2 <= 100)); + assertTrue(o3 == null || (o3 >= 0 && o3 <= 100)); LOG.info("Manipulating offsets"); @@ -264,16 +285,16 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { // get the offset CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); - long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); - long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); + Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); + Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); + Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); curatorFramework.close(); LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); // ensure that the offset has been committed - boolean atLeastOneOffsetSet = (o1 > 0 && o1 <= 100) || - (o2 > 0 && o2 <= 100) || - (o3 > 0 && o3 <= 100); + boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) || + (o2 != null && o2 > 0 && o2 <= 100) || + (o3 != null && o3 > 0 && o3 <= 100); assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet); deleteTestTopic(topicName); @@ -295,7 +316,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { public void testKafkaOffsetRetrievalToZookeeper() throws Exception { final String topicName = "testKafkaOffsetToZk"; final int parallelism = 3; + createTestTopic(topicName, parallelism, 1); + StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env1.getConfig().disableSysoutLogging(); env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); @@ -305,9 +328,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { writeSequence(env1, topicName, 50, parallelism); - StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - - // enable checkpointing + final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env2.getConfig().disableSysoutLogging(); env2.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env2.setParallelism(parallelism); @@ -320,78 +341,55 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { DataStream<String> stream = env2.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps)); stream.addSink(new DiscardingSink<String>()); - CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - String consumerGroupDir = standardProps.getProperty("group.id"); - TreeCache tc1 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/" + topicName + "/0"); - TreeCache tc2 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/" + topicName + "/1"); - TreeCache tc3 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/" + topicName + "/2"); - - // add listener to wait until first partition is updated in ZK - TreeCacheListener stopListener = new TreeCacheListener() { - AtomicInteger counter = new AtomicInteger(0); - @Override - public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { - LOG.info("Updated {}", event); - if (event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) { - if(counter.incrementAndGet() == 3) { - // cancel job, node has been created - LOG.info("Cancelling job after all three ZK nodes were updated"); - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); - } - } - } - }; - tc1.getListenable().addListener(stopListener); - tc1.start(); - tc2.getListenable().addListener(stopListener); - tc2.start(); - tc3.getListenable().addListener(stopListener); - tc3.start(); - - // the curator listener is not always working properly. Stop job after 10 seconds - final Tuple1<Throwable> error = new Tuple1<>(); - Thread canceller = new Thread(new Runnable() { + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + final Thread runner = new Thread("runner") { @Override public void run() { try { - Thread.sleep(10_000L); - LOG.info("Cancelling job after 10 seconds"); - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); - } catch (Throwable t) { - if (!(t instanceof InterruptedException)) { - error.f0 = t; + env2.execute(); + } + catch (Throwable t) { + if (!(t.getCause() instanceof JobCancellationException)) { + errorRef.set(t); } } } - }); - canceller.start(); - - try { - env2.execute("Idlying Kafka source"); - } catch( Throwable thr) { - if(!(thr.getCause() instanceof JobCancellationException)) { - throw thr; + }; + runner.start(); + + final CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); + final Long l49 = 49L; + + final long deadline = 30000 + System.currentTimeMillis(); + do { + Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); + Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); + Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); + + if (l49.equals(o1) && l49.equals(o2) && l49.equals(o3)) { + break; } + + Thread.sleep(100); } - tc1.close(); - tc2.close(); - tc3.close(); - - canceller.interrupt(); - canceller.join(); - if(error.f0 != null) { - throw new RuntimeException("Delayed cancelling thread had an error", error.f0); + while (System.currentTimeMillis() < deadline); + + // cancel the job + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + + final Throwable t = errorRef.get(); + if (t != null) { + throw new RuntimeException("Job failed with an exception", t); } // check if offsets are correctly in ZK - long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); - long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); - long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); - Assert.assertEquals(49L, o1); - Assert.assertEquals(49L, o2); - Assert.assertEquals(49L, o3); + Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); + Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); + Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); + Assert.assertEquals(Long.valueOf(49L), o1); + Assert.assertEquals(Long.valueOf(49L), o2); + Assert.assertEquals(Long.valueOf(49L), o3); curatorFramework.close(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java new file mode 100644 index 0000000..36fb7e6 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Properties; + +import static org.junit.Assert.*; + +public class KafkaConsumer08Test { + + @Test + public void testValidateZooKeeperConfig() { + try { + // empty + Properties emptyProperties = new Properties(); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + // no connect string (only group string) + Properties noConnect = new Properties(); + noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group"); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + // no group string (only connect string) + Properties noGroup = new Properties(); + noGroup.put("zookeeper.connect", "localhost:47574"); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateSourceWithoutCluster() { + try { + Properties props = new Properties(); + props.setProperty("zookeeper.connect", "localhost:56794"); + props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222"); + props.setProperty("group.id", "non-existent-group"); + + new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props); + fail(); + } + catch (Exception e) { + assertTrue(e.getMessage().contains("Unable to retrieve any partitions")); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java deleted file mode 100644 index 7337f65..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.commons.collections.map.LinkedMap; - -import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.junit.Ignore; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class KafkaConsumerTest { - - @Test - public void testValidateZooKeeperConfig() { - try { - // empty - Properties emptyProperties = new Properties(); - try { - FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties); - fail("should fail with an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - - // no connect string (only group string) - Properties noConnect = new Properties(); - noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group"); - try { - FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect); - fail("should fail with an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - - // no group string (only connect string) - Properties noGroup = new Properties(); - noGroup.put("zookeeper.connect", "localhost:47574"); - try { - FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup); - fail("should fail with an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSnapshot() { - try { - Field offsetsField = FlinkKafkaConsumerBase.class.getDeclaredField("partitionState"); - Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running"); - Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints"); - - offsetsField.setAccessible(true); - runningField.setAccessible(true); - mapField.setAccessible(true); - - FlinkKafkaConsumer08<?> consumer = mock(FlinkKafkaConsumer08.class); - when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod(); - - HashMap<KafkaTopicPartition, KafkaPartitionState> testState = new HashMap<>(); - HashMap<KafkaTopicPartition, Long> testOffsets = new HashMap<>(); - long[] offsets = new long[] { 43, 6146, 133, 16, 162, 616 }; - int j = 0; - for (long i: offsets) { - KafkaTopicPartition ktp = new KafkaTopicPartition("topic", j++); - testState.put(ktp, new KafkaPartitionState(ktp.getPartition(), i)); - testOffsets.put(ktp, i); - } - - LinkedMap map = new LinkedMap(); - - offsetsField.set(consumer, testState); - runningField.set(consumer, true); - mapField.set(consumer, map); - - assertTrue(map.isEmpty()); - - // make multiple checkpoints - for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) { - HashMap<KafkaTopicPartition, Long> checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId); - assertEquals(testOffsets, checkpoint); - - // change the offsets, make sure the snapshot did not change - HashMap<KafkaTopicPartition, Long> checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone(); - - for (Map.Entry<KafkaTopicPartition, Long> e: testOffsets.entrySet()) { - KafkaTopicPartition ktp = e.getKey(); - testState.put(ktp, new KafkaPartitionState(ktp.getPartition(), e.getValue() + 1)); - testOffsets.put(ktp, e.getValue() + 1); - } - - assertEquals(checkpointCopy, checkpoint); - - assertTrue(map.size() > 0); - assertTrue(map.size() <= FlinkKafkaConsumer08.MAX_NUM_PENDING_CHECKPOINTS); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - @Ignore("Kafka consumer internally makes an infinite loop") - public void testCreateSourceWithoutCluster() { - try { - Properties props = new Properties(); - props.setProperty("zookeeper.connect", "localhost:56794"); - props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222"); - props.setProperty("group.id", "non-existent-group"); - - new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java index 21140dd..c28799c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.junit.Test; +@SuppressWarnings("serial") public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase { @Test(timeout=60000) @@ -28,6 +29,6 @@ public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase { @Test(timeout=60000) public void testAutoOffsetResetNone() throws Exception { - runFailOnAutoOffsetResetNone(); + runFailOnAutoOffsetResetNoneEager(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java deleted file mode 100644 index c99e133..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl; -import org.apache.flink.streaming.connectors.kafka.KafkaTestBase; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class ZookeeperOffsetHandlerTest extends KafkaTestBase { - - @Test - public void runOffsetManipulationinZooKeeperTest() { - try { - final String topicName = "ZookeeperOffsetHandlerTest-Topic"; - final String groupId = "ZookeeperOffsetHandlerTest-Group"; - - final long offset = (long) (Math.random() * Long.MAX_VALUE); - - CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient(); - kafkaServer.createTestTopic(topicName, 3, 2); - - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset); - - long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0); - - curatorFramework.close(); - - assertEquals(offset, fetchedOffset); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties index 6bdfb48..fbeb110 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties @@ -25,5 +25,6 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - - +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 55f9875..d34cd2f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -17,38 +17,31 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator; import org.apache.flink.streaming.util.serialization.DeserializationSchema; - import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.util.SerializedValue; + import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.UUID; -import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from @@ -73,10 +66,8 @@ import static java.util.Objects.requireNonNull; */ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { - // ------------------------------------------------------------------------ - private static final long serialVersionUID = 2324564345203409112L; - + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class); /** Configuration key to change the polling timeout **/ @@ -85,35 +76,18 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { /** Boolean configuration key to disable metrics tracking **/ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; - /** - * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not - * available. If 0, returns immediately with any records that are available now. - */ + /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now. */ public static final long DEFAULT_POLL_TIMEOUT = 100L; + // ------------------------------------------------------------------------ + /** User-supplied properties for Kafka **/ private final Properties properties; - /** Ordered list of all partitions available in all subscribed partitions **/ - private final List<KafkaTopicPartition> partitionInfos; - - /** Unique ID identifying the consumer */ - private final String consumerId; - - // ------ Runtime State ------- - - /** The partitions actually handled by this consumer at runtime */ - private transient List<TopicPartition> subscribedPartitions; - /** For performance reasons, we are keeping two representations of the subscribed partitions **/ - private transient List<KafkaTopicPartition> subscribedPartitionsAsFlink; - /** The Kafka Consumer instance**/ - private transient KafkaConsumer<byte[], byte[]> consumer; - /** The thread running Kafka's consumer **/ - private transient ConsumerThread<T> consumerThread; - /** Exception set from the ConsumerThread */ - private transient Throwable consumerThreadException; - /** If the consumer doesn't have a Kafka partition assigned at runtime, it'll block on this waitThread **/ - private transient Thread waitThread; + /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now */ + private final long pollTimeout; // ------------------------------------------------------------------------ @@ -177,14 +151,30 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { * The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { - super(deserializer, props); - requireNonNull(topics, "topics"); - this.properties = requireNonNull(props, "props"); + super(deserializer); + + checkNotNull(topics, "topics"); + this.properties = checkNotNull(props, "props"); setDeserializer(this.properties); + + // configure the polling timeout + try { + if (properties.containsKey(KEY_POLL_TIMEOUT)) { + this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); + } else { + this.pollTimeout = DEFAULT_POLL_TIMEOUT; + } + } + catch (Exception e) { + throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); + } + + // read the partitions that belong to the listed topics + final List<KafkaTopicPartition> partitions = new ArrayList<>(); KafkaConsumer<byte[], byte[]> consumer = null; + try { consumer = new KafkaConsumer<>(this.properties); - this.partitionInfos = new ArrayList<>(); for (final String topic: topics) { // get partitions for each topic List<PartitionInfo> partitionsForTopic = null; @@ -203,307 +193,93 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { consumer.close(); try { Thread.sleep(1000); - } catch (InterruptedException e) { - } + } catch (InterruptedException ignored) {} + consumer = new KafkaConsumer<>(properties); } // for non existing topics, the list might be null. - if(partitionsForTopic != null) { - partitionInfos.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); + if (partitionsForTopic != null) { + partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } } - } finally { + } + finally { if(consumer != null) { consumer.close(); } } - if(partitionInfos.isEmpty()) { + + if (partitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); } // we now have a list of partitions which is the same for all parallel consumer instances. - LOG.info("Got {} partitions from these topics: {}", partitionInfos.size(), topics); + LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics); if (LOG.isInfoEnabled()) { - logPartitionInfo(partitionInfos); + logPartitionInfo(LOG, partitions); } - this.consumerId = UUID.randomUUID().toString(); + // register these partitions + setSubscribedPartitions(partitions); + } + + @Override + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + + boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); + + return new Kafka09Fetcher<>(sourceContext, thisSubtaskPartitions, + watermarksPeriodic, watermarksPunctuated, + runtimeContext, deserializer, + properties, pollTimeout, useMetrics); + } + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ /** * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable) + * * @param partitions A list of Kafka PartitionInfos. * @return A list of KafkaTopicPartitions */ - public static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) { - requireNonNull(partitions, "The given list of partitions was null"); + private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) { + checkNotNull(partitions); + List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size()); - for(PartitionInfo pi: partitions) { + for (PartitionInfo pi : partitions) { ret.add(new KafkaTopicPartition(pi.topic(), pi.partition())); } return ret; } - public static List<TopicPartition> convertToKafkaTopicPartition(List<KafkaTopicPartition> partitions) { - List<TopicPartition> ret = new ArrayList<>(partitions.size()); - for(KafkaTopicPartition ktp: partitions) { - ret.add(new TopicPartition(ktp.getTopic(), ktp.getPartition())); - } - return ret; - } - - // ------------------------------------------------------------------------ - // Source life cycle - // ------------------------------------------------------------------------ - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks(); - final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask(); - - // pick which partitions we work on - this.subscribedPartitionsAsFlink = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex); - if(this.subscribedPartitionsAsFlink.isEmpty()) { - LOG.info("This consumer doesn't have any partitions assigned"); - this.partitionState = null; - return; - } else { - StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext(); - // if checkpointing is enabled, we are not automatically committing to Kafka. - properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(!streamingRuntimeContext.isCheckpointingEnabled())); - this.consumer = new KafkaConsumer<>(properties); - } - subscribedPartitions = convertToKafkaTopicPartition(subscribedPartitionsAsFlink); - - this.consumer.assign(this.subscribedPartitions); - - // register Kafka metrics to Flink accumulators - if(!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"))) { - Map<MetricName, ? extends Metric> metrics = this.consumer.metrics(); - if(metrics == null) { - // MapR's Kafka implementation returns null here. - LOG.info("Consumer implementation does not support metrics"); - } else { - for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) { - String name = consumerId + "-consumer-" + metric.getKey().name(); - DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue()); - // best effort: we only add the accumulator if available. - if (kafkaAccumulator != null) { - getRuntimeContext().addAccumulator(name, kafkaAccumulator); - } - } - } - } - - // check if we need to explicitly seek to a specific offset (restore case) - if(restoreToOffset != null) { - // we are in a recovery scenario - for(Map.Entry<KafkaTopicPartition, Long> info: restoreToOffset.entrySet()) { - // seek all offsets to the right position - this.consumer.seek(new TopicPartition(info.getKey().getTopic(), info.getKey().getPartition()), info.getValue() + 1); - } - this.partitionState = restoreInfoFromCheckpoint(); - } else { - this.partitionState = new HashMap<>(); - } - } - - - - @Override - public void run(SourceContext<T> sourceContext) throws Exception { - if(consumer != null) { - consumerThread = new ConsumerThread<>(this, sourceContext); - consumerThread.setDaemon(true); - consumerThread.start(); - // wait for the consumer to stop - while(consumerThread.isAlive()) { - if(consumerThreadException != null) { - throw new RuntimeException("ConsumerThread threw an exception: " + consumerThreadException.getMessage(), consumerThreadException); - } - try { - consumerThread.join(50); - } catch (InterruptedException ie) { - consumerThread.shutdown(); - } - } - // check again for an exception - if(consumerThreadException != null) { - throw new RuntimeException("ConsumerThread threw an exception: " + consumerThreadException.getMessage(), consumerThreadException); - } - } else { - // this source never completes, so emit a Long.MAX_VALUE watermark - // to not block watermark forwarding - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - - final Object waitLock = new Object(); - this.waitThread = Thread.currentThread(); - while (running) { - // wait until we are canceled - try { - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (waitLock) { - waitLock.wait(); - } - } - catch (InterruptedException e) { - // do nothing, check our "running" status - } - } - } - // close the context after the work was done. this can actually only - // happen when the fetcher decides to stop fetching - sourceContext.close(); - } - - @Override - public void cancel() { - // set ourselves as not running - running = false; - if(this.consumerThread != null) { - this.consumerThread.shutdown(); - } else { - // the consumer thread is not running, so we have to interrupt our own thread - if(waitThread != null) { - waitThread.interrupt(); - } - } - } - - @Override - public void close() throws Exception { - cancel(); - super.close(); - } - - // ------------------------------------------------------------------------ - // Checkpoint and restore - // ------------------------------------------------------------------------ - - - @Override - protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) { - if(!running) { - LOG.warn("Unable to commit offsets on closed consumer"); - return; - } - Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets); - synchronized (this.consumer) { - this.consumer.commitSync(kafkaCheckpointOffsets); - } - } - - public static Map<TopicPartition, OffsetAndMetadata> convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) { - Map<TopicPartition, OffsetAndMetadata> ret = new HashMap<>(checkpointOffsets.size()); - for(Map.Entry<KafkaTopicPartition, Long> partitionOffset: checkpointOffsets.entrySet()) { - ret.put(new TopicPartition(partitionOffset.getKey().getTopic(), partitionOffset.getKey().getPartition()), - new OffsetAndMetadata(partitionOffset.getValue(), "")); - } - return ret; - } - - // ------------------------------------------------------------------------ - // Miscellaneous utilities - // ------------------------------------------------------------------------ - - - protected static void setDeserializer(Properties props) { - if (!props.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); - } else { - LOG.warn("Overwriting the '{}' is not recommended", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); - } - - if (!props.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); - } else { - LOG.warn("Overwriting the '{}' is not recommended", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); - } - } - /** - * We use a separate thread for executing the KafkaConsumer.poll(timeout) call because Kafka is not - * handling interrupts properly. On an interrupt (which happens automatically by Flink if the task - * doesn't react to cancel() calls), the poll() method might never return. - * On cancel, we'll wakeup the .poll() call and wait for it to return + * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties. + * + * @param props The Kafka properties to register the serializer in. */ - private static class ConsumerThread<T> extends Thread { - private final FlinkKafkaConsumer09<T> flinkKafkaConsumer; - private final SourceContext<T> sourceContext; - private boolean running = true; - - public ConsumerThread(FlinkKafkaConsumer09<T> flinkKafkaConsumer, SourceContext<T> sourceContext) { - this.flinkKafkaConsumer = flinkKafkaConsumer; - this.sourceContext = sourceContext; - } + private static void setDeserializer(Properties props) { + final String deSerName = ByteArrayDeserializer.class.getCanonicalName(); - @Override - public void run() { - try { - long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT))); - pollLoop: while (running) { - ConsumerRecords<byte[], byte[]> records; - //noinspection SynchronizeOnNonFinalField - synchronized (flinkKafkaConsumer.consumer) { - try { - records = flinkKafkaConsumer.consumer.poll(pollTimeout); - } catch (WakeupException we) { - if (running) { - throw we; - } - // leave loop - continue; - } - } - // get the records for each topic partition - for (int i = 0; i < flinkKafkaConsumer.subscribedPartitions.size(); i++) { - TopicPartition partition = flinkKafkaConsumer.subscribedPartitions.get(i); - KafkaTopicPartition flinkPartition = flinkKafkaConsumer.subscribedPartitionsAsFlink.get(i); - List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition); - //noinspection ForLoopReplaceableByForEach - for (int j = 0; j < partitionRecords.size(); j++) { - ConsumerRecord<byte[], byte[]> record = partitionRecords.get(j); - T value = flinkKafkaConsumer.deserializer.deserialize(record.key(), record.value(), record.topic(), record.partition(),record.offset()); - if(flinkKafkaConsumer.deserializer.isEndOfStream(value)) { - // end of stream signaled - running = false; - break pollLoop; - } - synchronized (sourceContext.getCheckpointLock()) { - flinkKafkaConsumer.processElement(sourceContext, flinkPartition, value, record.offset()); - } - } - } - } - } catch(Throwable t) { - if(running) { - this.flinkKafkaConsumer.stopWithError(t); - } else { - LOG.debug("Stopped ConsumerThread threw exception", t); - } - } finally { - try { - flinkKafkaConsumer.consumer.close(); - } catch(Throwable t) { - LOG.warn("Error while closing consumer", t); - } - } - } + Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); - /** - * Try to shutdown the thread - */ - public void shutdown() { - this.running = false; - this.flinkKafkaConsumer.consumer.wakeup(); + if (keyDeSer != null && !keyDeSer.equals(deSerName)) { + LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + } + if (valDeSer != null && !valDeSer.equals(deSerName)) { + LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); } - } - private void stopWithError(Throwable t) { - this.consumerThreadException = t; + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); } }