nsivabalan edited a comment on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-792339635
If I am not wrong, there was a bug in the code version that you ran. I tested 4 different variants of code to arrive at the latest proposal. Let me walk through them :) Sorry about the lengthy response. Hopefully we get a closure. 1st variant. Current master branch: ``` FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); LOG.warn("HoodieLogFileReader :: canonical name :: " + fsDataInputStream.getClass().getCanonicalName() + ", name " + fsDataInputStream.getClass().getName()); if (FSUtils.isGCSInputStream(fsDataInputStream)) { LOG.warn("HoodieLogFileReader :: 111 start GCSFileSystem " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) (( (FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize))); LOG.warn("HoodieLogFileReader :: 111 completed "); } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { LOG.warn("HoodieLogFileReader :: 222 start " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); LOG.warn("HoodieLogFileReader :: 222 complete"); } else { LOG.warn("HoodieLogFileReader :: 333 "); // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream // need to wrap in another BufferedFSInputStream the make bufferSize work? this.inputStream = fsDataInputStream; } ``` Output from my run: "HoodieLogFileReader :: canonical name :: org.apache.hadoop.fs.FSDataInputStream, name org.apache.hadoop.fs.FSDataInputStream" "HoodieLogFileReader :: 111 start GCSFileSystem com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream" Caused by: java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to org.apache.hadoop .fs.FSDataInputStream at org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:84) at org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131) ... 24 more 2nd variant: This PR just before my last commit. ``` if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { LOG.warn("HoodieLogFileReader 1111 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); } else if (FSUtils.isGCSFileSystem(fs)) { LOG.warn("HoodieLogFileReader 2222 aaa " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); try { FSInputStream localFSInputStream = (FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream()); inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream(localFSInputStream,bufferSize))), true); LOG.warn("HoodieLogFileReader 2222 aaa succeeded " + logFile.getFileName()); } catch (ClassCastException e) { Log.warn("HoodieLogFileReader 2222 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause() + ", msg " + e.getMessage()); // if we cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is LOG.warn("Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original " + "fsDataInputStream"); inputStreamLocal = fsDataInputStream; } } else { // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream // need to wrap in another BufferedFSInputStream the make bufferSize work? LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName()); inputStreamLocal = fsDataInputStream; } ``` Output from the run: "HoodieLogFileReader 1111 .0d7ba334-2847-4b24-997e-1dbecfd12e3b-0_20210306132835.log.1_0-55-75 com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream" So, what this essentially means is that fsDataInputStream.getWrappedStream() instanceof FSInputStream for GCSFileSystem. And the execution don't even go into the else block here which is our intention actually. 3rd variant: just to check if fsDataInputStream.getWrappedStream() is an instance of FSDataInputStream or FSInputStream ``` if (FSUtils.isGCSFileSystem(fs)) { LOG.warn("HoodieLogFileReader 111 aaa " + logFile.getFileName() + " can_name: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName() + ". Is wrappedStream instance of fsDataInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream) + " , is wrappedSTream instance of fsInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSInputStream)); try { FSInputStream localFSInputStream = (FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream()); inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream(localFSInputStream,bufferSize))), true); LOG.warn("HoodieLogFileReader 111 aaa succeeded " + logFile.getFileName()); } catch (ClassCastException e) { LOG.warn("HoodieLogFileReader 111 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause() + ", msg " + e.getMessage()); // if we cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is LOG.warn("Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original " + "fsDataInputStream"); inputStreamLocal = fsDataInputStream; } } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { LOG.warn("HoodieLogFileReader 222 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); LOG.warn("HoodieLogFileReader 222 completed "); } else { // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream // need to wrap in another BufferedFSInputStream the make bufferSize work? LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName()); inputStreamLocal = fsDataInputStream; } ``` Output from the run: "HoodieLogFileReader 111 aaa .978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 can_name: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. Is wrappedStream instance of fsDataInputStream false , is wrappedSTream instance of fsInputStream true" "HoodieLogFileReader 111 bbb (aaa failed) .978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 null, msg com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to org.apache.hadoop.fs.FSDataInputStream" "Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original fsDataInputStream" . . After this, the seek ran into issue. Caused by: java.io.EOFException: Invalid seek offset: position value (1584) must be between 0 and 1584 for 'gs://dataproc-staging-us- . . . So, since we encountered class cast exception, we don't leverage the SchemeAwareFSDataInputStream class at all and hence ran into seek issue. 4th variant: my latest commit w/ the proposed fix. ``` if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { LOG.warn("HoodieLogFileReader 111 start " + logFile.getFileName()); inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); LOG.warn("HoodieLogFileReader 111 completed "); if (FSUtils.isGCSFileSystem(fs)) { LOG.warn("HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream"); inputStreamLocal = new SchemeAwareFSDataInputStream(inputStreamLocal, true); } } else { // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream // need to wrap in another BufferedFSInputStream the make bufferSize work? LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName()); inputStreamLocal = fsDataInputStream; } ``` Output from the run: "HoodieLogFileReader 111 start .7a1a0684-b710-4a44-97c4-4c98b75db8a2-0_20210306142209.log.1_2-55-76" "HoodieLogFileReader 111 completed " "HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream" // No exceptions. all good. Summary: at some point, we came up w/ two conditions, where condition 1 refers to fsDataInputStream.getWrappedStream() instanceof FSInputStream. and condition2 caters to GCSFileSystem. But in reality, GCSFileSystem also falls into first condition i.e. fsDataInputStream.getWrappedStream() instanceof FSInputStream. Hence the proposed fix. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org