- Update Parquet writer to always make binary copy of data for statistics holding purposes. (Fixes JVM crash in certain cases.) - Update WriterRecordBatch to stop doing premature cleanup. In the case that a downstream operator is still holding memory allocated in the writer record batch, it was possible that the operator would try to close the allocator before that memory had been released.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/06f0e178 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/06f0e178 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/06f0e178 Branch: refs/heads/master Commit: 06f0e178e7738dc7ecd81faad8d1f73003c84fb8 Parents: 66d5be4 Author: Jacques Nadeau <[email protected]> Authored: Wed Nov 5 18:59:40 2014 -0800 Committer: Jinfeng Ni <[email protected]> Committed: Fri Nov 7 10:50:57 2014 -0800 ---------------------------------------------------------------------- exec/java-exec/pom.xml | 2 +- .../exec/physical/impl/WriterRecordBatch.java | 82 ++++++++++---------- 2 files changed, 42 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/06f0e178/exec/java-exec/pom.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 55721d1..0dea38a 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -125,7 +125,7 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-column</artifactId> - <version>1.5.1-drill-r4</version> + <version>1.5.1-drill-r5</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/06f0e178/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index acbb815..07302d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -88,7 +88,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { @Override public IterOutcome innerNext() { if(processed) { - cleanup(); +// cleanup(); // if the upstream record batch is already processed and next() is called by // downstream then return NONE to indicate completion return IterOutcome.NONE; @@ -96,48 +96,52 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { // process the complete upstream in one next() call IterOutcome upstream; - do { - upstream = next(incoming); - - switch(upstream) { - case NOT_YET: - case NONE: - case STOP: - if (upstream == IterOutcome.STOP) { - return upstream; - } - break; - - case OK_NEW_SCHEMA: - try{ - setupNewSchema(); - } catch(Exception ex) { - kill(false); - logger.error("Failure during query", ex); - context.fail(ex); + try{ + do { + upstream = next(incoming); + + switch(upstream) { + case STOP: return IterOutcome.STOP; - } - // fall through. - case OK: - try { + + case NOT_YET: + case NONE: + break; + + case OK_NEW_SCHEMA: + setupNewSchema(); + // fall through. + case OK: counter += eventBasedRecordWriter.write(incoming.getRecordCount()); logger.debug("Total records written so far: {}", counter); - } catch(IOException ex) { - throw new RuntimeException(ex); - } - for(VectorWrapper v : incoming) { - v.getValueVector().clear(); - } - break; + for(VectorWrapper<?> v : incoming) { + v.getValueVector().clear(); + } + break; + + default: + throw new UnsupportedOperationException(); + } + } while(upstream != IterOutcome.NONE); + }catch(Exception ex){ + kill(false); + logger.error("Failure during query", ex); + context.fail(ex); + return IterOutcome.STOP; + } - default: - throw new UnsupportedOperationException(); - } - } while(upstream != IterOutcome.NONE); + addOutputContainerData(); + processed = true; + return IterOutcome.OK_NEW_SCHEMA; + } - VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById(VarCharVector.class, container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds()).getValueVector(); + private void addOutputContainerData(){ + VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById( // + VarCharVector.class, // + container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds() // + ).getValueVector(); AllocationHelper.allocate(fragmentIdVector, 1, 50); BigIntVector summaryVector = (BigIntVector) container.getValueAccessorById(BigIntVector.class, container.getValueVectorId(SchemaPath.getSimplePath("Number of records written")).getFieldIds()).getValueVector(); @@ -148,9 +152,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { summaryVector.getMutator().setValueCount(1); container.setRecordCount(1); - processed = true; - - return IterOutcome.OK_NEW_SCHEMA; } protected void setupNewSchema() throws Exception { @@ -167,8 +168,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { container.addOrGet(fragmentIdField); container.addOrGet(summaryField); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - } catch(IOException ex) { - throw new RuntimeException("Failed to update schema in RecordWriter", ex); } finally{ stats.stopSetup(); } @@ -186,6 +185,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { recordWriter.cleanup(); } } catch(IOException ex) { + logger.error("Failure while closing record writer", ex); throw new RuntimeException("Failed to close RecordWriter", ex); } }
