Repository: flink Updated Branches: refs/heads/release-1.4 8a052bf09 -> 62bf00189
[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/736b9088 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/736b9088 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/736b9088 Branch: refs/heads/release-1.4 Commit: 736b9088dcee64a1d3b19575f29a80c377f94fb8 Parents: 8a052bf Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Wed Nov 22 11:37:48 2017 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Nov 23 14:45:00 2017 +0100 ---------------------------------------------------------------------- .../kafka/FlinkKafka011ErrorCode.java | 26 ++++++++++++ .../kafka/FlinkKafka011Exception.java | 42 ++++++++++++++++++++ .../connectors/kafka/FlinkKafkaProducer011.java | 22 +++++----- 3 files changed, 81 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/736b9088/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java new file mode 100644 index 0000000..4f5de4f --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +/** + * Error codes used in {@link FlinkKafka011Exception}. + */ +public enum FlinkKafka011ErrorCode { + PRODUCERS_POOL_EMPTY, + EXTERNAL_ERROR +} http://git-wip-us.apache.org/repos/asf/flink/blob/736b9088/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java new file mode 100644 index 0000000..6b16e53 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.util.FlinkException; + +/** + * Exception used by {@link FlinkKafkaProducer011} and {@link FlinkKafkaConsumer011}. + */ +public class FlinkKafka011Exception extends FlinkException { + + private final FlinkKafka011ErrorCode errorCode; + + public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message) { + super(message); + this.errorCode = errorCode; + } + + public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + } + + public FlinkKafka011ErrorCode getErrorCode() { + return errorCode; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/736b9088/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 6b0136d..0c741f5 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -551,7 +551,7 @@ public class FlinkKafkaProducer011<IN> } @Override - public void invoke(KafkaTransactionState transaction, IN next, Context context) throws Exception { + public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception { checkErroneous(); byte[] serializedKey = schema.serializeKey(next); @@ -587,7 +587,7 @@ public class FlinkKafkaProducer011<IN> } @Override - public void close() throws Exception { + public void close() throws FlinkKafka011Exception { final KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction != null) { // to avoid exceptions on aborting transactions with some pending records @@ -612,7 +612,7 @@ public class FlinkKafkaProducer011<IN> // ------------------- Logic for handling checkpoint flushing -------------------------- // @Override - protected KafkaTransactionState beginTransaction() throws Exception { + protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception { switch (semantic) { case EXACTLY_ONCE: FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool(); @@ -631,12 +631,13 @@ public class FlinkKafkaProducer011<IN> } } - private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws Exception { + private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws FlinkKafka011Exception { FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll(); if (producer == null) { String transactionalId = availableTransactionalIds.poll(); if (transactionalId == null) { - throw new Exception( + throw new FlinkKafka011Exception( + FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."); } producer = initTransactionalProducer(transactionalId, true); @@ -646,7 +647,7 @@ public class FlinkKafkaProducer011<IN> } @Override - protected void preCommit(KafkaTransactionState transaction) throws Exception { + protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception { switch (semantic) { case EXACTLY_ONCE: case AT_LEAST_ONCE: @@ -740,7 +741,7 @@ public class FlinkKafkaProducer011<IN> * Flush pending records. * @param transaction */ - private void flush(KafkaTransactionState transaction) throws Exception { + private void flush(KafkaTransactionState transaction) throws FlinkKafka011Exception { if (transaction.producer != null) { transaction.producer.flush(); } @@ -936,12 +937,15 @@ public class FlinkKafkaProducer011<IN> return producer; } - private void checkErroneous() throws Exception { + private void checkErroneous() throws FlinkKafka011Exception { Exception e = asyncException; if (e != null) { // prevent double throwing asyncException = null; - throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); + throw new FlinkKafka011Exception( + FlinkKafka011ErrorCode.EXTERNAL_ERROR, + "Failed to send data to Kafka: " + e.getMessage(), + e); } }