>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20443?usp=email )
Change subject: [NO ISSUE][*DB][EXT] Make IDataParser extend Closeable, misc ...................................................................... [NO ISSUE][*DB][EXT] Make IDataParser extend Closeable, misc Ext-ref: MB-68827 Change-Id: Id8d97f948d048e5c204bb0b78c089420b76dddc9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20443 Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Tested-by: Michael Blow <[email protected]> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java M hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java 8 files changed, 58 insertions(+), 9 deletions(-) Approvals: Hussain Towaileb: Looks good to me, approved Michael Blow: Looks good to me, but someone else must approve; Verified diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index a75ee33..0a3ce80 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -192,6 +192,7 @@ + " this node before the cluster controller sent the stop request"); } else { executor.execute(() -> { + Thread.currentThread().setName(runtimeId.toString()); try { stopIfRunning(runtime, content.getTimeout(), content.getUnit()); } catch (Throwable th) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java index 5dbc383..d3a7448 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.external.api; +import java.io.Closeable; import java.io.DataOutput; import org.apache.asterix.builders.IARecordBuilder; @@ -37,14 +38,18 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -public interface IDataParser { +public interface IDataParser extends Closeable { + @Override + default void close() { + // Default no-op + } /* * The following two static methods are expensive. right now, they are used by RSSFeeds and * Twitter feed * TODO: Get rid of them */ - public static void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder) + static void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder) throws HyracksDataException { ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage(); int numFields = record.getType().getFieldNames().length; @@ -58,7 +63,7 @@ } @SuppressWarnings("unchecked") - public static void writeObject(IAObject obj, DataOutput dataOutput) throws HyracksDataException { + static void writeObject(IAObject obj, DataOutput dataOutput) throws HyracksDataException { switch (obj.getType().getTypeTag()) { case OBJECT: { IARecordBuilder recordBuilder = new RecordBuilder(); @@ -105,7 +110,7 @@ } } - public static <T> void toBytes(T serializable, ArrayBackedValueStorage buffer, ISerializerDeserializer<T> serde) + static <T> void toBytes(T serializable, ArrayBackedValueStorage buffer, ISerializerDeserializer<T> serde) throws HyracksDataException { buffer.reset(); DataOutput out = buffer.getDataOutput(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java index f544ca0..abb41f2 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java @@ -18,13 +18,14 @@ */ package org.apache.asterix.external.api; +import java.io.Closeable; import java.io.IOException; import java.util.function.LongSupplier; @FunctionalInterface -public interface IRecordConverter<I, O> { +public interface IRecordConverter<I, O> extends Closeable { - public O convert(IRawRecord<? extends I> input) throws IOException; + O convert(IRawRecord<? extends I> input) throws IOException; /** * Configures the converter with information suppliers from the {@link IRecordReader} data source. @@ -33,4 +34,8 @@ */ default void configure(LongSupplier lineNumber) { } + + default void close() { + // default no-op + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 4279ebd..6ef5497 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -252,6 +252,8 @@ } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw HyracksDataException.create(e); + } finally { + dataParser.close(); } return true; } diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java index 1a806cd..17f9a2e 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java @@ -125,4 +125,22 @@ ensureCapacity(bytesRequired); count = bytesRequired; } + + /** + * Reset the stream and shrink the internal buffer if its size is larger than newSize. + * @param newSize + * @return true if the internal buffer was reallocated, false otherwise. + */ + public boolean resetAndShrink(int newSize) { + reset(); + if (buf.length > newSize) { + buf = new byte[newSize]; + return true; + } + return false; + } + + public int capacity() { + return buf.length; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java index 9f01123..cac8719 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java @@ -240,7 +240,8 @@ try { response.close(); } catch (IOException e) { - LOGGER.debug("{} ignoring exception thrown on stream close due to interrupt", description, e); + LOGGER.debug("{} ignoring exception thrown on stream close due to interrupt: {}", description, + String.valueOf(e)); } }); try { @@ -249,7 +250,8 @@ LOGGER.warn("{} did not exit on stream close due to interrupt after 1s", description); readFuture.cancel(true); } catch (ExecutionException ee) { - LOGGER.debug("ignoring exception awaiting aborted {} shutdown", description, ee); + LOGGER.debug("ignoring exception awaiting aborted {} shutdown: {}", description, + String.valueOf(ee.getCause())); } throw ex; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java index 3b6669e..5b9053b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java @@ -84,7 +84,7 @@ return; } catch (HyracksDataException e) { if (isIgnorable(e)) { - LOGGER.debug("Ignoring exception on drop", e); + LOGGER.debug("Ignoring exception on drop: {}", String.valueOf(e)); return; } if (canRetry(e)) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 017e767..2ac29fe 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -19,6 +19,9 @@ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,6 +66,8 @@ public class LSMHarness implements ILSMHarness { private static final Logger LOGGER = LogManager.getLogger(); + private static final DateTimeFormatter OP_THREAD_TIMESTAMP = + DateTimeFormatter.ofPattern("HH:mm:ss").withZone(ZoneId.systemDefault()); protected final ILSMIndex lsmIndex; protected final ILSMIOOperationScheduler ioScheduler; @@ -543,7 +548,9 @@ } public void doIo(ILSMIOOperation operation) { + String origName = Thread.currentThread().getName(); try { + Thread.currentThread().setName(threadName(operation)); operation.getCallback().beforeOperation(operation); ILSMDiskComponent newComponent = operation.getIOOpertionType() == LSMIOOperationType.FLUSH ? lsmIndex.flush(operation) : lsmIndex.merge(operation); @@ -569,6 +576,7 @@ lsmIndex, th); } } + Thread.currentThread().setName(origName); } // if the operation failed, we need to cleanup files if (operation.getStatus() == LSMIOOperationStatus.FAILURE) { @@ -576,6 +584,14 @@ } } + private static String threadName(ILSMIOOperation operation) { + if (operation.getIOOpertionType() == LSMIOOperationType.NOOP) { + return String.valueOf(operation.getIOOpertionType()); + } + return operation.getIOOpertionType() + ":" + operation.getTarget().getRelativePath() + "@" + + OP_THREAD_TIMESTAMP.format(ZonedDateTime.now()); + } + @Override public void merge(ILSMIOOperation operation) throws HyracksDataException { if (LOGGER.isDebugEnabled()) { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20443?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: neo Gerrit-Change-Id: Id8d97f948d048e5c204bb0b78c089420b76dddc9 Gerrit-Change-Number: 20443 Gerrit-PatchSet: 9 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-CC: Anon. E. Moose #1000171
