Repository: nifi Updated Branches: refs/heads/master 85405dae1 -> af6f63691
NIFI-3818: PutHiveStreaming throws IllegalStateException Changed from async append to sync as it breaks 'recursionSet' check in StandardProcessSession by updating it from multiple threads, resulting IllegalStateException to happen. This closes #1761. Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af6f6369 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af6f6369 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af6f6369 Branch: refs/heads/master Commit: af6f63691cdeee802da7f6d9ddf8b21b2bc40760 Parents: 85405da Author: Koji Kawamura <ijokaruma...@apache.org> Authored: Fri May 5 18:33:38 2017 +0900 Committer: Bryan Bende <bbe...@apache.org> Committed: Fri May 5 13:25:59 2017 -0400 ---------------------------------------------------------------------- .../nifi/processors/hive/PutHiveStreaming.java | 120 +++++++------------ 1 file changed, 45 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/af6f6369/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index 2754f9c..e7d85cd 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -21,6 +21,7 @@ import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -64,6 +65,7 @@ import org.apache.nifi.util.hive.HiveOptions; import org.apache.nifi.util.hive.HiveUtils; import org.apache.nifi.util.hive.HiveWriter; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -75,8 +77,6 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,7 +85,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.regex.Pattern; /** @@ -383,16 +382,13 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { private AtomicReference<FlowFile> failureFlowFile; private final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()); private final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()); + private byte[] successAvroHeader; + private byte[] failureAvroHeader; private final AtomicInteger recordCount = new AtomicInteger(0); private final AtomicInteger successfulRecordCount = new AtomicInteger(0); private final AtomicInteger failedRecordCount = new AtomicInteger(0); - private volatile ExecutorService appendRecordThreadPool; - private volatile AtomicBoolean closed = new AtomicBoolean(false); - private final BlockingQueue<List<HiveStreamingRecord>> successRecordQueue = new ArrayBlockingQueue<>(100); - private final BlockingQueue<List<HiveStreamingRecord>> failureRecordQueue = new ArrayBlockingQueue<>(100); - private final ComponentLog logger; /** @@ -412,9 +408,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { this.failureFlowFile = new AtomicReference<>(failureFlowFile); } - private void initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader, - DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef, - BlockingQueue<List<HiveStreamingRecord>> queue, Function<Integer, Boolean> isCompleted) { + private byte[] initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader, + DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef) { writer.setCodec(CodecFactory.fromString(codec)); // Transfer metadata (this is a subset of the incoming file) @@ -424,71 +419,59 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } } - appendRecordThreadPool.submit(() -> { - flowFileRef.set(session.append(flowFileRef.get(), (out) -> { - // Create writer so that records can be appended. - writer.create(reader.getSchema(), out); - - try { - int writtenCount = 0; - while (true) { - - if (closed.get() && isCompleted.apply(writtenCount)) { - break; - } + final ByteArrayOutputStream avroHeader = new ByteArrayOutputStream(); + flowFileRef.set(session.append(flowFileRef.get(), (out) -> { + // Create writer so that records can be appended later. + writer.create(reader.getSchema(), avroHeader); + writer.close(); - final List<HiveStreamingRecord> hRecords = queue.poll(100, TimeUnit.MILLISECONDS); - if (hRecords != null) { - try { - for (HiveStreamingRecord hRecord : hRecords) { - writer.append(hRecord.getRecord()); - writtenCount++; - } - } catch (IOException ioe) { - // The records were put to Hive Streaming successfully, but there was an error while writing the - // Avro records to the flow file. Log as an error and move on. - logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe); - } - } - } - writer.flush(); - } catch (InterruptedException e) { - logger.warn("Append record thread is interrupted, " + e, e); - } + final byte[] header = avroHeader.toByteArray(); + out.write(header); + })); - })); - }); + // Capture the Avro header byte array that is just written to the FlowFile. + // This is needed when Avro records are appended to the same FlowFile. + return avroHeader.toByteArray(); } private void initAvroWriters(ProcessSession session, String codec, DataFileStream<GenericRecord> reader) { - appendRecordThreadPool = Executors.newFixedThreadPool(2); - initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile, successRecordQueue, w -> w == successfulRecordCount.get()); - initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile, failureRecordQueue, w -> w == failedRecordCount.get()); + successAvroHeader = initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile); + failureAvroHeader = initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile); + } + + private void appendAvroRecords(ProcessSession session, byte[] avroHeader, DataFileWriter<GenericRecord> writer, + AtomicReference<FlowFile> flowFileRef, List<HiveStreamingRecord> hRecords) { - // No new task. - appendRecordThreadPool.shutdown(); + flowFileRef.set(session.append(flowFileRef.get(), (out) -> { + if (hRecords != null) { + // Initialize the writer again as append mode, so that Avro header is written only once. + writer.appendTo(new SeekableByteArrayInput(avroHeader), out); + try { + for (HiveStreamingRecord hRecord : hRecords) { + writer.append(hRecord.getRecord()); + } + } catch (IOException ioe) { + // The records were put to Hive Streaming successfully, but there was an error while writing the + // Avro records to the flow file. Log as an error and move on. + logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe); + } + } + writer.close(); + })); } - private void appendRecordsToSuccess(List<HiveStreamingRecord> records) { - appendRecordsToFlowFile(records, successRecordQueue); + private void appendRecordsToSuccess(ProcessSession session, List<HiveStreamingRecord> records) { + appendAvroRecords(session, successAvroHeader, successAvroWriter, successFlowFile, records); successfulRecordCount.addAndGet(records.size()); } - private void appendRecordsToFailure(List<HiveStreamingRecord> records) { - appendRecordsToFlowFile(records, failureRecordQueue); + private void appendRecordsToFailure(ProcessSession session, List<HiveStreamingRecord> records) { + appendAvroRecords(session, failureAvroHeader, failureAvroWriter, failureFlowFile, records); failedRecordCount.addAndGet(records.size()); } - private void appendRecordsToFlowFile(List<HiveStreamingRecord> records, BlockingQueue<List<HiveStreamingRecord>> queue) { - if (!queue.add(records)) { - throw new ProcessException(String.format("Failed to append %d records due to insufficient internal queue capacity.", records.size())); - } - } - private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) { - closeAvroWriters(); - if (successfulRecordCount.get() > 0) { // Transfer the flow file with successful records successFlowFile.set( @@ -513,19 +496,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { }); } - private void closeAvroWriters() { - closed.set(true); - if (appendRecordThreadPool != null) { - // Having null thread pool means the input FlowFile was not processed at all, due to illegal format. - try { - if (!appendRecordThreadPool.awaitTermination(10, TimeUnit.SECONDS)) { - logger.warn("Waiting for Avro records being appended into output FlowFiles has been timeout."); - } - } catch (InterruptedException e) { - logger.warn("Waiting for Avro records being appended into output FlowFiles has been interrupted."); - } - } - } } private static class ShouldRetryException extends RuntimeException { @@ -545,7 +515,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { case Failure: // Add the failed record to the failure flow file getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e); - fc.appendRecordsToFailure(input); + fc.appendRecordsToFailure(session, input); break; case Retry: @@ -670,7 +640,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { Runnable flushSuccessfulRecords = () -> { // Now send the records to the successful FlowFile and update the success count - functionContext.appendRecordsToSuccess(successfulRecords.get()); + functionContext.appendRecordsToSuccess(session, successfulRecords.get()); // Clear the list of successful records, we'll use it at the end when we flush whatever records are left successfulRecords.set(new ArrayList<>()); };