This is an automated email from the ASF dual-hosted git repository.
ymdavis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c604923 NIFI-6947: Add PutRecord processor using RecordSinkService
(#3943)
c604923 is described below
commit c604923c0b20533299cbc2fa7e88dbfc0d790f80
Author: Matthew Burgess <[email protected]>
AuthorDate: Wed Jan 8 03:25:14 2020 -0500
NIFI-6947: Add PutRecord processor using RecordSinkService (#3943)
* NIFI-6947: Add PutRecord processor using RecordSinkService
* NIFI-6947: Incorporated review comments
---
.../nifi-standard-processors/pom.xml | 4 +
.../apache/nifi/processors/standard/PutRecord.java | 192 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../processors/standard/MockRecordSinkService.java | 92 ++++++++++
.../nifi/processors/standard/TestPutRecord.java | 150 ++++++++++++++++
.../nifi/record/sink/RetryableIOException.java | 41 +++++
6 files changed, 480 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 72cdb65..9c77464 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -82,6 +82,10 @@
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-sink-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutRecord.java
new file mode 100644
index 0000000..180d44b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutRecord.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.record.sink.RetryableIOException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"record", "put", "sink"})
+@CapabilityDescription("The PutRecord processor uses a specified RecordReader
to input (possibly multiple) records from an incoming flow file, and sends them
"
+ + "to a destination specified by a Record Destination Service (i.e.
record sink).")
+public class PutRecord extends AbstractProcessor {
+
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("put-record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for reading
incoming data")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECORD_SINK = new
PropertyDescriptor.Builder()
+ .name("put-record-sink")
+ .displayName("Record Destination Service")
+ .description("Specifies the Controller Service to use for writing
out the query result records to some destination.")
+ .identifiesControllerService(RecordSinkService.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new
PropertyDescriptor.Builder()
+ .name("put-record-include-zero-record-results")
+ .displayName("Include Zero Record Results")
+ .description("If no records are read from the incoming FlowFile,
this property specifies whether or not an empty record set will be transmitted.
The original "
+ + "FlowFile will still be routed to success, but if no
transmission occurs, no provenance SEND event will be generated.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ // Relationships
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("The original FlowFile will be routed to this
relationship if the records were transmitted successfully")
+ .build();
+
+ static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("The original FlowFile is routed to this relationship
if the records could not be transmitted but attempting the operation again may
succeed")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if the
records could not be transmitted and retrying the operation will also fail")
+ .build();
+
+ private static final List<PropertyDescriptor> properties;
+ private static final Set<Relationship> relationships;
+
+ private volatile RecordSinkService recordSinkService;
+
+ static {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(RECORD_READER);
+ props.add(RECORD_SINK);
+ props.add(INCLUDE_ZERO_RECORD_RESULTS);
+ properties = Collections.unmodifiableList(props);
+
+ final Set<Relationship> r = new HashSet<>();
+ r.add(REL_SUCCESS);
+ r.add(REL_FAILURE);
+ r.add(REL_RETRY);
+ relationships = Collections.unmodifiableSet(r);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ recordSinkService =
context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
+ recordSinkService.reset();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ final StopWatch stopWatch = new StopWatch(true);
+
+ RecordSet recordSet;
+ try (final InputStream in = session.read(flowFile)) {
+
+ final RecordReaderFactory recordParserFactory =
context.getProperty(RECORD_READER)
+ .asControllerService(RecordReaderFactory.class);
+ final RecordReader recordParser =
recordParserFactory.createRecordReader(flowFile, in, getLogger());
+ recordSet = recordParser.createRecordSet();
+
+ final boolean transmitZeroRecords =
context.getProperty(INCLUDE_ZERO_RECORD_RESULTS).asBoolean();
+ final WriteResult writeResult =
recordSinkService.sendData(recordSet, new HashMap<>(flowFile.getAttributes()),
transmitZeroRecords);
+ String recordSinkURL =
writeResult.getAttributes().get("record.sink.url");
+ if (StringUtils.isEmpty(recordSinkURL)) {
+ recordSinkURL = "unknown://";
+ }
+
+ final long transmissionMillis =
stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+ // Only record provenance if we sent any records
+ if (writeResult.getRecordCount() > 0 || transmitZeroRecords) {
+ session.getProvenanceReporter().send(flowFile, recordSinkURL,
transmissionMillis);
+ }
+
+ } catch (RetryableIOException rioe) {
+ getLogger().warn("Error during transmission of records due to {},
routing to retry", new Object[]{rioe.getMessage()}, rioe);
+ session.transfer(flowFile, REL_RETRY);
+ return;
+ } catch (SchemaNotFoundException snfe) {
+ throw new ProcessException("Error determining schema of flowfile
records: " + snfe.getMessage(), snfe);
+ } catch (MalformedRecordException e) {
+ getLogger().error("Error reading records from {} due to {},
routing to failure", new Object[]{flowFile, e.getMessage()}, e);
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ } catch (IOException ioe) {
+ // The cause might be a MalformedRecordException (RecordReader
wraps it in an IOException), send to failure in that case
+ if (ioe.getCause() instanceof MalformedRecordException) {
+ getLogger().error("Error reading records from {} due to {},
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
+ session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ throw new ProcessException("Error reading from flowfile input
stream: " + ioe.getMessage(), ioe);
+ } catch (Exception e) {
+ getLogger().error("Error during transmission of records due to {},
routing to failure", new Object[]{e.getMessage()}, e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 37bf6c2..f2c4f64 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -91,6 +91,7 @@ org.apache.nifi.processors.standard.PutEmail
org.apache.nifi.processors.standard.PutFile
org.apache.nifi.processors.standard.PutFTP
org.apache.nifi.processors.standard.PutJMS
+org.apache.nifi.processors.standard.PutRecord
org.apache.nifi.processors.standard.PutSFTP
org.apache.nifi.processors.standard.PutSQL
org.apache.nifi.processors.standard.PutSyslog
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockRecordSinkService.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockRecordSinkService.java
new file mode 100644
index 0000000..13d86ee
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockRecordSinkService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.record.sink.RetryableIOException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockRecordSinkService extends AbstractConfigurableComponent
implements RecordSinkService {
+
+ private List<Map<String, Object>> rows = new ArrayList<>();
+ private boolean transmitted = false;
+ private boolean failWithRetryableError = false;
+
+ public MockRecordSinkService() {
+ }
+
+ public MockRecordSinkService(boolean failWithRetryableError) {
+ this();
+ this.failWithRetryableError = failWithRetryableError;
+ }
+
+ @Override
+ public WriteResult sendData(RecordSet recordSet, Map<String, String>
attributes, boolean sendZeroResults) throws IOException {
+ if (failWithRetryableError) {
+ throw new RetryableIOException("Retryable");
+ }
+ int numRecordsWritten = 0;
+ RecordSchema recordSchema = recordSet.getSchema();
+ Record record;
+ while ((record = recordSet.next()) != null) {
+ Map<String, Object> row = new HashMap<>();
+ final Record finalRecord = record;
+ recordSchema.getFieldNames().forEach((fieldName) ->
row.put(fieldName, finalRecord.getValue(fieldName)));
+ rows.add(row);
+ numRecordsWritten++;
+ }
+
+ if (numRecordsWritten > 0 || sendZeroResults) {
+ transmitted = true;
+ }
+ return WriteResult.of(numRecordsWritten, Collections.emptyMap());
+ }
+
+ @Override
+ public String getIdentifier() {
+ return "MockRecordSinkService";
+ }
+
+ @Override
+ public void initialize(ControllerServiceInitializationContext context)
throws InitializationException {
+ }
+
+ public List<Map<String, Object>> getRows() {
+ return rows;
+ }
+
+ public boolean isTransmitted() {
+ return transmitted;
+ }
+
+ public void setFailWithRetryableError(boolean failWithRetryableError) {
+ this.failWithRetryableError = failWithRetryableError;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutRecord.java
new file mode 100644
index 0000000..3a3c839
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutRecord.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestPutRecord {
+
+ private TestRunner testRunner;
+ private MockRecordParser recordReader;
+ private MockRecordSinkService mockRecordSinkService;
+
+ @Before
+ public void setup() {
+ PutRecord processor = new PutRecord();
+ testRunner = TestRunners.newTestRunner(processor);
+ recordReader = new MockRecordParser();
+ testRunner.setProperty(PutRecord.RECORD_READER, "reader");
+ mockRecordSinkService = new MockRecordSinkService();
+ testRunner.setProperty(PutRecord.RECORD_SINK, "MockRecordSinkService");
+ }
+
+ @Test
+ public void testSimplePut() throws Exception {
+ testRunner.addControllerService("reader", recordReader);
+ testRunner.enableControllerService(recordReader);
+
+ testRunner.addControllerService("MockRecordSinkService",
mockRecordSinkService);
+ testRunner.enableControllerService(mockRecordSinkService);
+
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ recordReader.addRecord("John Doe", 48, "Soccer");
+ recordReader.addRecord("Jane Doe", 47, "Tennis");
+ recordReader.addRecord("Sally Doe", 47, "Curling");
+ recordReader.addRecord("Jimmy Doe", 14, null);
+ recordReader.addRecord("Pizza Doe", 14, null);
+
+ testRunner.enqueue("");
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(PutRecord.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testNoRows() throws Exception {
+ testRunner.addControllerService("reader", recordReader);
+ testRunner.enableControllerService(recordReader);
+
+ testRunner.addControllerService("MockRecordSinkService",
mockRecordSinkService);
+ testRunner.enableControllerService(mockRecordSinkService);
+
+ testRunner.setProperty(PutRecord.INCLUDE_ZERO_RECORD_RESULTS, "false");
+
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ testRunner.enqueue("");
+ testRunner.run();
+
+ assertTrue(mockRecordSinkService.getRows().isEmpty());
+ assertFalse(mockRecordSinkService.isTransmitted());
+ // Original flow file is still transferred
+ testRunner.assertAllFlowFilesTransferred(PutRecord.REL_SUCCESS, 1);
+ testRunner.clearTransferState();
+
+ // Send an empty record set anyway
+ testRunner.setProperty(PutRecord.INCLUDE_ZERO_RECORD_RESULTS, "true");
+ testRunner.enqueue("");
+ testRunner.run();
+
+ assertTrue(mockRecordSinkService.getRows().isEmpty());
+ assertTrue(mockRecordSinkService.isTransmitted());
+ // Original flow file is still transferred
+ testRunner.assertAllFlowFilesTransferred(PutRecord.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testBadRecords() throws Exception {
+ recordReader = new MockRecordParser(1);
+ testRunner.addControllerService("reader", recordReader);
+ testRunner.enableControllerService(recordReader);
+
+ testRunner.addControllerService("MockRecordSinkService",
mockRecordSinkService);
+ testRunner.enableControllerService(mockRecordSinkService);
+
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ recordReader.addRecord("John Doe", 48, "Soccer");
+ recordReader.addRecord("Jane Doe", 47, "Tennis");
+ recordReader.addRecord("Sally Doe", 47, "Curling");
+ recordReader.addRecord("Jimmy Doe", 14, null);
+ recordReader.addRecord("Pizza Doe", 14, null);
+
+ testRunner.enqueue("");
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(PutRecord.REL_FAILURE, 1);
+ }
+
+ @Test
+ public void testRetryableError() throws Exception {
+ recordReader = new MockRecordParser();
+ testRunner.addControllerService("reader", recordReader);
+ testRunner.enableControllerService(recordReader);
+
+ mockRecordSinkService.setFailWithRetryableError(true);
+ testRunner.addControllerService("MockRecordSinkService",
mockRecordSinkService);
+ testRunner.enableControllerService(mockRecordSinkService);
+
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ recordReader.addRecord("John Doe", 48, "Soccer");
+
+ testRunner.enqueue("");
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(PutRecord.REL_RETRY, 1);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RetryableIOException.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RetryableIOException.java
new file mode 100644
index 0000000..9fd4a65
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RetryableIOException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink;
+
+import java.io.IOException;
+
+/**
+ * This is a marker class for IOExceptions that may succeed if the operation
is performed again, it is a hint to retry the operation
+ */
+public class RetryableIOException extends IOException {
+
+ public RetryableIOException() {
+ super();
+ }
+
+ public RetryableIOException(String message) {
+ super(message);
+ }
+
+ public RetryableIOException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RetryableIOException(Throwable cause) {
+ super(cause);
+ }
+}