Till Westmann has submitted this change and it was merged. Change subject: small refactoring ......................................................................
small refactoring Change-Id: I37eab1645416e3aad6119bba527c5e3b4b98fddc Reviewed-on: https://asterix-gerrit.ics.uci.edu/1052 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java 1 file changed, 21 insertions(+), 15 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 4189dbf..10e9125 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -54,7 +54,7 @@ protected final boolean sendMarker; protected boolean failed = false; private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker; - private Future<?> result; + private Future<?> dataflowMarkerResult; public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser, @@ -69,12 +69,7 @@ @Override public void start(IFrameWriter writer) throws HyracksDataException { - ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null; - if (sendMarker && dataflowMarker == null) { - dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(), - TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx)); - result = executorService.submit(dataflowMarker); - } + startDataflowMarker(); HyracksDataException hde = null; try { failed = false; @@ -102,9 +97,7 @@ LOGGER.warn("Failure while operating a feed source", e); throw new HyracksDataException(e); } - if(dataflowMarker != null){ - dataflowMarker.stop(); - } + stopDataflowMarker(); try { tupleForwarder.close(); } catch (Throwable th) { @@ -117,8 +110,8 @@ hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th); } finally { closeSignal(); - if (sendMarker && result != null) { - result.cancel(true); + if (sendMarker && dataflowMarkerResult != null) { + dataflowMarkerResult.cancel(true); } } if (hde != null) { @@ -149,6 +142,21 @@ protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException { } + private void startDataflowMarker() { + ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null; + if (sendMarker && dataflowMarker == null) { + dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(), + TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx)); + dataflowMarkerResult = executorService.submit(dataflowMarker); + } + } + + private void stopDataflowMarker() { + if (dataflowMarker != null) { + dataflowMarker.stop(); + } + } + private void closeSignal() { synchronized (closed) { closed.set(true); @@ -166,9 +174,7 @@ @Override public boolean stop() throws HyracksDataException { - if (dataflowMarker != null) { - dataflowMarker.stop(); - } + stopDataflowMarker(); HyracksDataException hde = null; if (recordReader.stop()) { if (failed) { -- To view, visit https://asterix-gerrit.ics.uci.edu/1052 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I37eab1645416e3aad6119bba527c5e3b4b98fddc Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>