Author: schor Date: Wed Apr 10 15:07:12 2013 New Revision: 1466505 URL: http://svn.apache.org/r1466505 Log: [UIMA-2498 UIMA-2778] add test case for type filtering while deserializing and fix many bugs it exposed. Add support for parallel step
Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java?rev=1466505&r1=1466504&r2=1466505&view=diff ============================================================================== --- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java (original) +++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/BinaryCasSerDes6.java Wed Apr 10 15:07:12 2013 @@ -198,6 +198,11 @@ import org.apache.uima.util.impl.Seriali public class BinaryCasSerDes6 { private static final int[] INT0 = new int[0]; + + private static final boolean TRACE_SER = false; + private static final boolean TRACE_DES = false; + + private static final boolean TRACE_STR_ARRAY = false; /** * Version of the serializer/deserializer, used to allow deserialization of * older versions @@ -404,6 +409,7 @@ public class BinaryCasSerDes6 { * Things for just deserialization **********************************/ + private AllowPreexistingFS allowPreexistingFS; private DataInputStream deserIn; private int version; @@ -457,6 +463,7 @@ public class BinaryCasSerDes6 { * For normal serialization - can be null, but if not, is used in place of re-calculating, for speed up * For delta deserialization - must not be null, and is the saved value after serializing to the service * For normal deserialization - must be null + * @param allowPreexistingFSs - if false, throws error if deserializing delta cas has modifications below the mark, for parallel remotes * @param doMeasurements if true, measurements are done (on serialization) * @param compressLevel if not null, specifies enum instance for compress level * @param compressStrategy if not null, specifies enum instance for compress strategy @@ -466,7 +473,7 @@ public class BinaryCasSerDes6 { MarkerImpl mark, TypeSystemImpl tgtTs, ReuseInfo rfs, - boolean doMeasurements, + boolean doMeasurements, CompressLevel compressLevel, CompressStrat compressStrategy) { cas = ((CASImpl) ((aCas instanceof JCas) ? ((JCas)aCas).getCas(): aCas)).getBaseCAS(); @@ -482,7 +489,6 @@ public class BinaryCasSerDes6 { this.sm = doMeasurements ? new SerializationMeasures() : null; isDelta = isSerializingDelta = (mark != null); - doMeasurements = (sm != null); typeMapperCmn = typeMapper = ts.getTypeSystemMapper(tgtTs); isTypeMappingCmn = isTypeMapping = (null != typeMapper); @@ -673,6 +679,9 @@ public class BinaryCasSerDes6 { } final int tCode = heap[iHeap]; // get type code final int mappedTypeCode = isTypeMapping ? typeMapper.mapTypeCodeSrc2Tgt(tCode) : tCode; + if (TRACE_SER) { + System.out.format("Ser: adr: %,d tCode: %s %,d tgtTypeCode: %,d %n", iHeap, tCode, ts.getTypeInfo(tCode), mappedTypeCode); + } if (mappedTypeCode == 0) { // means no corresponding type in target system continue; } @@ -760,6 +769,9 @@ public class BinaryCasSerDes6 { break; case Slot_StrRef: for (int i = iHeap + 2; i < endi; i++) { + if (TRACE_STR_ARRAY) { + System.out.format("Trace Str Array Ser: addr: %,d string=%s%n", i, stringHeapObj.getStringForCode(heap[i])); + } writeString(stringHeapObj.getStringForCode(heap[i])); } break; @@ -1541,10 +1553,7 @@ public class BinaryCasSerDes6 { *************************************************************************************/ public void deserialize(InputStream istream) throws IOException { - deserIn = (istream instanceof DataInputStream) ? (DataInputStream) istream - : new DataInputStream(istream); - - readHeader(); + readHeader(istream); if (isReadingDelta) { if (!reuseInfoProvided) { @@ -1554,10 +1563,36 @@ public class BinaryCasSerDes6 { cas.resetNoQuestions(); } - deserializeAfterVersion(deserIn, isReadingDelta); + deserializeAfterVersion(deserIn, isReadingDelta, AllowPreexistingFS.allow); } - public void deserializeAfterVersion(DataInputStream istream, boolean isDelta) throws IOException { + /** + * Version used by uima-as to read delta cas from remote parallel steps + * @param istream + * @param allowPreexistingFSs + * @throws IOException + */ + public void deserialize(InputStream istream, AllowPreexistingFS allowPreexistingFS) throws IOException { + readHeader(istream); + + if (isReadingDelta) { + if (!reuseInfoProvided) { + throw new UnsupportedOperationException("Deserializing Delta Cas, but original not serialized from"); + } + } else { + throw new UnsupportedOperationException("Delta CAS required for this call"); + } + + deserializeAfterVersion(deserIn, isReadingDelta, allowPreexistingFS); + } + + + public void deserializeAfterVersion(DataInputStream istream, boolean isDelta, AllowPreexistingFS allowPreexistingFS) throws IOException { + + this.allowPreexistingFS = allowPreexistingFS; + if (allowPreexistingFS == AllowPreexistingFS.ignore) { + throw new UnsupportedOperationException("AllowPreexistingFS.ignore not an allowed setting"); + } deserIn = istream; this.isDelta = isReadingDelta = isDelta; @@ -1604,7 +1639,6 @@ public class BinaryCasSerDes6 { // Above the merge line: only the 2nd is possible if (isReadingDelta) { - // scan current source being added to / merged into if (!reuseInfoProvided) { throw new IllegalStateException("Reading Delta into CAS not serialized from"); } @@ -1621,6 +1655,7 @@ public class BinaryCasSerDes6 { } final int tgtTypeCode = readVnumber(typeCode_dis); // get type code final int srcTypeCode = isTypeMapping ? typeMapper.mapTypeCodeTgt2Src(tgtTypeCode) : tgtTypeCode; + final boolean storeIt = (srcTypeCode != 0); // A receiving client from a service always // has a superset of the service's types due to type merging so this @@ -1629,8 +1664,8 @@ public class BinaryCasSerDes6 { // deleted a type. // The strategy for deserializing heap refs depends on finding - // the prev value for that type. This can be skipped for - // types being skipped because they don't exist in the source + // the prev value for that type. This must be done in the context + // of the sending CAS's type system // typeInfo is Target Type Info final TypeInfo tgtTypeInfo = isTypeMapping ? tgtTs.getTypeInfo(tgtTypeCode) : @@ -1643,7 +1678,14 @@ public class BinaryCasSerDes6 { typeInfo = tgtTypeInfo; initPrevIntValue(iHeap); // note "typeInfo" a hidden parameter - ugly... } - typeInfo = srcTypeInfo; + if (TRACE_DES) { + System.out.format("Des: addr %,d tgtTypeCode: %,d %s srcTypeCode: %,d%n", iHeap, tgtTypeCode, tgtTypeInfo, srcTypeCode); + } + +// if (srcTypeInfo == null) { +// typeInfo = null; // debugging +// } + typeInfo = storeIt ? srcTypeInfo : tgtTypeInfo; // if !storeIt, then srcTypeInfo is null. fsStartIndexes.addSrcAddrForTgt(iHeap, storeIt); if (storeIt) { @@ -1667,7 +1709,7 @@ public class BinaryCasSerDes6 { final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(srcTypeCode); for (int i = 0; i < tgtFeatOffsets2Src.length; i++) { final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1; - SlotKind kind = typeInfo.getSlotKind(featOffsetInSrc); + SlotKind kind = tgtTypeInfo.slotKinds[i]; // target kind , may not exist in src readByKind(iHeap, featOffsetInSrc, kind, storeIt); } } else { @@ -1767,7 +1809,7 @@ public class BinaryCasSerDes6 { for (int i = startIheap; i < endi; i++) { final int v = readDiff(arrayElementKind, prev); prev = v; - if (startIheap == i && isUpdatePrevOK) { + if (startIheap == i && isUpdatePrevOK && storeIt) { updatePrevIntValue(iHeap, 2, v); } if (storeIt) { @@ -1790,7 +1832,10 @@ public class BinaryCasSerDes6 { break; case Slot_StrRef: for (int i = iHeap + 2; i < endi; i++) { - final int strRef = readString(storeIt); + final int strRef = readString(storeIt); + if (TRACE_STR_ARRAY) { + System.out.format("Trace String Array Des addr: %,d storeIt=%s, string=%s%n", i, storeIt ? "Y" : "N", stringHeapObj.getStringForCode(strRef)); + } if (storeIt) { heap[i] = strRef; } @@ -1809,7 +1854,7 @@ public class BinaryCasSerDes6 { */ private void readByKind(int iHeap, int offset, SlotKind kind, boolean storeIt) throws IOException { - if (offset == -1) { + if (offset == 0) { storeIt = false; } switch (kind) { @@ -1842,7 +1887,7 @@ public class BinaryCasSerDes6 { } break; case Slot_LongRef: { - long v = readLongOrDouble(kind, (iPrevHeap == 0) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset])); + long v = readLongOrDouble(kind, (!storeIt || (iPrevHeap == 0)) ? 0L : longHeapObj.getHeapValue(heap[iPrevHeap + offset])); if (v == 0L) { if (longZeroIndex == -1) { longZeroIndex = longHeapObj.addLong(0L); @@ -1954,7 +1999,7 @@ public class BinaryCasSerDes6 { byte_dis.readFully(byteHeapObj.heap, startPos, length); return startPos; } else { - byte_dis.skipBytes(length); + skipBytes(byte_dis, length); return 0; } } @@ -1970,7 +2015,7 @@ public class BinaryCasSerDes6 { } return startPos; } else { - short_dis.skipBytes(length * 2); + skipBytes(short_dis, length * 2); return 0; } } @@ -2009,7 +2054,7 @@ public class BinaryCasSerDes6 { } else { v = readDiff(kind, 0); } - if (isUpdatePrevOK) { + if (storeIt && isUpdatePrevOK) { updatePrevIntValue(iHeap, offset, v); } } @@ -2057,8 +2102,8 @@ public class BinaryCasSerDes6 { private void skipLong(final int length) throws IOException { for (int i = 0; i < length; i++) { - long_High_dis.skipBytes(8); - long_Low_dis.skipBytes(8); + skipBytes(long_High_dis, 8); + skipBytes(long_Low_dis, 8); } } @@ -2126,15 +2171,19 @@ public class BinaryCasSerDes6 { return 0; } if (1 == length) { - if (storeIt) { + // always store, in case later offset ref +// if (storeIt) { return stringHeapObj.addString(""); - } else { - return 0; - } +// } else { +// return 0; +// } } if (length < 0) { // in this case, -length is the slot index if (storeIt) { + if (TRACE_STR_ARRAY) { + System.out.format("Trace String Array Des ref to offset %,d%n", length); + } return stringTableOffset - length; } else { return 0; @@ -2146,11 +2195,20 @@ public class BinaryCasSerDes6 { if (debugEOF) { System.out.format("readString offset = %,d%n", offset); } - if (storeIt) { + // need to store all strings, because an otherwise skipped one may be referenced + // later as an offset into the string table +// if (storeIt) { String s = readCommonString[segmentIndex].substring(offset, offset + length - 1); return stringHeapObj.addString(s); - } else { - return 0; +// } else { +// return 0; +// } + } + + private void skipBytes(DataInputStream stream, int skipNumber) throws IOException { + final int r = stream.skipBytes(skipNumber); + if (r == 0) { + throw new IOException("0 bytes skipped, causing out-of-synch while deserializing"); } } @@ -2188,6 +2246,13 @@ public class BinaryCasSerDes6 { final int modFSsLength = readVnumber(control_dis); int prevSeq = 0; + if ((modFSsLength > 0) && (allowPreexistingFS == AllowPreexistingFS.disallow)) { + CASRuntimeException e = new CASRuntimeException( + CASRuntimeException.DELTA_CAS_PREEXISTING_FS_DISALLOWED, + new String[] {String.format("%,d pre-existing Feature Structures modified", modFSsLength)}); + throw e; + } + // if (isTypeMapping) { // for (int i = 0; i < AuxHeapsCount; i++) { // srcHeapIndexOffset[i] = 0; @@ -2205,7 +2270,7 @@ public class BinaryCasSerDes6 { iHeap = fsStartIndexes.getSrcAddrFromTgtSeq(seqNbrModified); if (iHeap < 1) { - // never happen because delta CAS ts system case, the + // never happen because in the delta CAS ts system use-case, the // target is always a subset of the source // due to type system merging throw new RuntimeException("never happen"); @@ -2560,9 +2625,9 @@ public class BinaryCasSerDes6 { if (isTypeMapping) { final int[] tgtFeatOffsets2Src = typeMapper.getTgtFeatOffsets2Src(tCode); - if (tgtFeatOffsets2Src == null ) { - System.out.println("debug caught"); - } +// if (tgtFeatOffsets2Src == null ) { +// System.out.println("debug caught"); +// } for (int i = 0; i < tgtFeatOffsets2Src.length; i++) { final int featOffsetInSrc = tgtFeatOffsets2Src[i] + 1; // add one for origin 1 if (featOffsetInSrc == 0) { @@ -2693,6 +2758,8 @@ public class BinaryCasSerDes6 { * Compare 2 CASes, with perhaps different type systems. * If the type systems are different, construct a type mapper and use that * to selectively ignore types or features not in other type system + * + * The Maopper filters C1 -> C2. * * Compare only feature structures reachable via indexes or refs * The order must match @@ -2820,11 +2887,11 @@ public class BinaryCasSerDes6 { continue; } if ((tCode1_2 == 0) && (tCode2_1 != 0)) { - i2++; + i1++; continue; } if ((tCode1_2 != 0) && (tCode2_1 == 0)) { - i1++; + i2++; continue; } } else { // not type mapping @@ -2837,7 +2904,7 @@ public class BinaryCasSerDes6 { } } - if (i1 >= c1FoundFSs.length && i1 >= c2FoundFSs.length) { + if (i1 >= c1FoundFSs.length && i2 >= c2FoundFSs.length) { return true; // end, everything compared } if (isTypeMapping) { @@ -3083,6 +3150,7 @@ public class BinaryCasSerDes6 { private StringBuilder dumpHeapFs(CASImpl cas, final int iHeap, final TypeSystemImpl ts) { StringBuilder sb = new StringBuilder(); typeInfo = ts.getTypeInfo(cas.getHeap().heap[iHeap]); + sb.append("Heap Addr: ").append(iHeap).append(' '); sb.append(typeInfo).append(' '); if (typeInfo.isHeapStoredArray) { @@ -3415,7 +3483,10 @@ public class BinaryCasSerDes6 { } } - private void readHeader() throws IOException { + private void readHeader(InputStream istream) throws IOException { + deserIn = (istream instanceof DataInputStream) ? (DataInputStream) istream + : new DataInputStream(istream); + // key // determine if byte swap if needed based on key byte[] bytebuf = new byte[4]; Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java?rev=1466505&r1=1466504&r2=1466505&view=diff ============================================================================== --- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java (original) +++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/cas/impl/CASImpl.java Wed Apr 10 15:07:12 2013 @@ -1200,7 +1200,7 @@ public class CASImpl extends AbstractCas if (compressedVersion == 0) { (new BinaryCasSerDes4(this.getTypeSystemImpl(), false)).deserialize(this, dis, delta); } else { - (new BinaryCasSerDes6(this, rfs)).deserializeAfterVersion(dis, delta); + (new BinaryCasSerDes6(this, rfs)).deserializeAfterVersion(dis, delta, AllowPreexistingFS.allow); } return; } Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java?rev=1466505&r1=1466504&r2=1466505&view=diff ============================================================================== --- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java (original) +++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/cas/impl/SerDesTest6.java Wed Apr 10 15:07:12 2013 @@ -224,9 +224,9 @@ public class SerDesTest6 extends TestCas public SerDesTest6() { Random sg = new Random(); long seed = sg.nextLong(); -// seed = -2659090483652661635L; +// seed = 2934127305128325787L; random = new Random(seed); -// System.out.format("RandomSeed: %,d%n", seed); + System.out.format("RandomSeed: %,d%n", seed); mSrc = setupTTypeSystem(TwoTypes); casSrc = mSrc.cas; @@ -283,6 +283,36 @@ public class SerDesTest6 extends TestCas public void tearDown() { } + /** + * Make one of each kind of artifact, including arrays + * serialize to byte stream, deserialize into new cas, compare + */ + + public void testAllKinds() { + if (doPlain) { + serdesSimple(getTT(EqTwoTypes)); + } else { + for (TTypeSystem m : alternateTTypeSystems) { + switch (m.kind){ + // note: case statements *not* grouped in order to faclitate debugging + case OneTypeSubsetFeatures: + serdesSimple(m); + break; + case TwoTypesSubsetFeatures: + serdesSimple(m); + break; + case TwoTypes: + case EqTwoTypes: + case OneType: + case TwoTypesNoFeatures: + serdesSimple(m); + break; + } + } + } + } + + // Test chains going through filtered type // Repeat below with OneType, and TwoTypes with filtered slot == fsRef @@ -388,39 +418,17 @@ public class SerDesTest6 extends TestCas verifyDelta(marker, ri); } - /** - * Make one of each kind of artifact, including arrays - * serialize to byte stream, deserialize into new cas, compare - */ - - public void testAllKinds() { - if (doPlain) { - serdesSimple(getTT(EqTwoTypes)); - } else { - for (TTypeSystem m : alternateTTypeSystems) { - switch (m.kind){ - case OneTypeSubsetFeatures: - serdesSimple(m); - break; - case TwoTypesSubsetFeatures: - serdesSimple(m); - break; - case TwoTypes: - case EqTwoTypes: - case OneType: - case TwoTypesNoFeatures: - serdesSimple(m); - break; - } - } - } - } private void serdesSimple(TTypeSystem m) { remoteCas = setupCas(m); casSrc.reset(); loadCas(casSrc, mSrc); - verify(remoteCas); + verify(remoteCas); + + // test case where serialization is done without type filtering, + // and deserialization is done with filtering + remoteCas.reset(); + verifyDeserFilter(remoteCas); } /** @@ -1131,6 +1139,29 @@ public class SerDesTest6 extends TestCas throw new RuntimeException(e); } } + + private void verifyDeserFilter(CASImpl casTgt) { + // serialize w/o filter + BinaryCasSerDes6 bcs = null; + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + if (doPlain) { + return; + } else { + bcs = new BinaryCasSerDes6(casSrc, (ReuseInfo) null); + bcs.serialize(baos); + } + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + bcs = new BinaryCasSerDes6(casTgt, null, casSrc.getTypeSystemImpl()); + bcs.deserialize(bais); + + bcs = new BinaryCasSerDes6(casSrc, null, casTgt.getTypeSystemImpl()); + assertTrue(bcs.compareCASes(casSrc, casTgt)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } // casSrc -> remoteCas private ReuseInfo[] serializeDeserialize(