Repository: flink
Updated Branches:
  refs/heads/release-1.4 8a052bf09 -> 62bf00189


[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead 
of generic Exception


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/736b9088
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/736b9088
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/736b9088

Branch: refs/heads/release-1.4
Commit: 736b9088dcee64a1d3b19575f29a80c377f94fb8
Parents: 8a052bf
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Wed Nov 22 11:37:48 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Nov 23 14:45:00 2017 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafka011ErrorCode.java           | 26 ++++++++++++
 .../kafka/FlinkKafka011Exception.java           | 42 ++++++++++++++++++++
 .../connectors/kafka/FlinkKafkaProducer011.java | 22 +++++-----
 3 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/736b9088/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
new file mode 100644
index 0000000..4f5de4f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+/**
+ * Error codes used in {@link FlinkKafka011Exception}.
+ */
+public enum FlinkKafka011ErrorCode {
+       PRODUCERS_POOL_EMPTY,
+       EXTERNAL_ERROR
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/736b9088/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
new file mode 100644
index 0000000..6b16e53
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception used by {@link FlinkKafkaProducer011} and {@link 
FlinkKafkaConsumer011}.
+ */
+public class FlinkKafka011Exception extends FlinkException {
+
+       private final FlinkKafka011ErrorCode errorCode;
+
+       public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String 
message) {
+               super(message);
+               this.errorCode = errorCode;
+       }
+
+       public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String 
message, Throwable cause) {
+               super(message, cause);
+               this.errorCode = errorCode;
+       }
+
+       public FlinkKafka011ErrorCode getErrorCode() {
+               return errorCode;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/736b9088/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 6b0136d..0c741f5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -551,7 +551,7 @@ public class FlinkKafkaProducer011<IN>
        }
 
        @Override
-       public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws Exception {
+       public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafka011Exception {
                checkErroneous();
 
                byte[] serializedKey = schema.serializeKey(next);
@@ -587,7 +587,7 @@ public class FlinkKafkaProducer011<IN>
        }
 
        @Override
-       public void close() throws Exception {
+       public void close() throws FlinkKafka011Exception {
                final KafkaTransactionState currentTransaction = 
currentTransaction();
                if (currentTransaction != null) {
                        // to avoid exceptions on aborting transactions with 
some pending records
@@ -612,7 +612,7 @@ public class FlinkKafkaProducer011<IN>
        // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
 
        @Override
-       protected KafkaTransactionState beginTransaction() throws Exception {
+       protected KafkaTransactionState beginTransaction() throws 
FlinkKafka011Exception {
                switch (semantic) {
                        case EXACTLY_ONCE:
                                FlinkKafkaProducer<byte[], byte[]> producer = 
createOrGetProducerFromPool();
@@ -631,12 +631,13 @@ public class FlinkKafkaProducer011<IN>
                }
        }
 
-       private FlinkKafkaProducer<byte[], byte[]> 
createOrGetProducerFromPool() throws Exception {
+       private FlinkKafkaProducer<byte[], byte[]> 
createOrGetProducerFromPool() throws FlinkKafka011Exception {
                FlinkKafkaProducer<byte[], byte[]> producer = 
getProducersPool().poll();
                if (producer == null) {
                        String transactionalId = 
availableTransactionalIds.poll();
                        if (transactionalId == null) {
-                               throw new Exception(
+                               throw new FlinkKafka011Exception(
+                                       
FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
                                        "Too many ongoing snapshots. Increase 
kafka producers pool size or decrease number of concurrent checkpoints.");
                        }
                        producer = initTransactionalProducer(transactionalId, 
true);
@@ -646,7 +647,7 @@ public class FlinkKafkaProducer011<IN>
        }
 
        @Override
-       protected void preCommit(KafkaTransactionState transaction) throws 
Exception {
+       protected void preCommit(KafkaTransactionState transaction) throws 
FlinkKafka011Exception {
                switch (semantic) {
                        case EXACTLY_ONCE:
                        case AT_LEAST_ONCE:
@@ -740,7 +741,7 @@ public class FlinkKafkaProducer011<IN>
         * Flush pending records.
         * @param transaction
         */
-       private void flush(KafkaTransactionState transaction) throws Exception {
+       private void flush(KafkaTransactionState transaction) throws 
FlinkKafka011Exception {
                if (transaction.producer != null) {
                        transaction.producer.flush();
                }
@@ -936,12 +937,15 @@ public class FlinkKafkaProducer011<IN>
                return producer;
        }
 
-       private void checkErroneous() throws Exception {
+       private void checkErroneous() throws FlinkKafka011Exception {
                Exception e = asyncException;
                if (e != null) {
                        // prevent double throwing
                        asyncException = null;
-                       throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
+                       throw new FlinkKafka011Exception(
+                               FlinkKafka011ErrorCode.EXTERNAL_ERROR,
+                               "Failed to send data to Kafka: " + 
e.getMessage(),
+                               e);
                }
        }
 

Reply via email to