nsivabalan edited a comment on pull request #2500:
URL: https://github.com/apache/hudi/pull/2500#issuecomment-792339635


   If I am not wrong, there was a bug in the code version that you ran. I 
tested 4 different variants of code to arrive at the latest proposal. Let me 
walk through them :) Sorry about the lengthy response. Hopefully we get a 
closure. 
   
   1st variant. Current master branch:
   
   ```
   FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
       LOG.warn("HoodieLogFileReader :: canonical name :: " + 
fsDataInputStream.getClass().getCanonicalName() + ", name "
               + fsDataInputStream.getClass().getName());
       if (FSUtils.isGCSInputStream(fsDataInputStream)) {
         LOG.warn("HoodieLogFileReader :: 111 start GCSFileSystem " + 
fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
         this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
             new BufferedFSInputStream((FSInputStream) ((
                 (FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
         LOG.warn("HoodieLogFileReader :: 111 completed ");
       } else if (fsDataInputStream.getWrappedStream() instanceof 
FSInputStream) {
         LOG.warn("HoodieLogFileReader :: 222 start " + 
fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
         this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
             new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
         LOG.warn("HoodieLogFileReader :: 222 complete");
       } else {
         LOG.warn("HoodieLogFileReader :: 333 ");
         // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
         // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
         this.inputStream = fsDataInputStream;
       }
   ```
   
   Output from my run:
   
   "HoodieLogFileReader :: canonical name :: 
org.apache.hadoop.fs.FSDataInputStream, name 
org.apache.hadoop.fs.FSDataInputStream" 
   
   "HoodieLogFileReader :: 111 start GCSFileSystem 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream" 
   
    Caused by: java.lang.ClassCastException: 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to 
org.apache.hadoop
   .fs.FSDataInputStream
           at 
org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:84)
           at 
org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62)
           at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131)
           ... 24 more
   
   
   2nd variant: 
   This PR just before my last commit. 
   ```
   if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
         LOG.warn("HoodieLogFileReader 1111 " + logFile.getFileName() + " " + 
fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
         inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
             new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
       } else if (FSUtils.isGCSFileSystem(fs)) {
         LOG.warn("HoodieLogFileReader 2222 aaa " + logFile.getFileName() + " " 
+ fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
         try {
           FSInputStream localFSInputStream = 
(FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream());
           inputStreamLocal = new SchemeAwareFSDataInputStream(new 
TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
               new BufferedFSInputStream(localFSInputStream,bufferSize))), 
true);
           LOG.warn("HoodieLogFileReader 2222 aaa succeeded " + 
logFile.getFileName());
         } catch (ClassCastException e) {
           Log.warn("HoodieLogFileReader 2222 bbb (aaa failed) " + 
logFile.getFileName() + " " + e.getCause()
                   + ", msg " + e.getMessage());
           // if we cannot cast  
fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, 
fallback to using as is
           LOG.warn("Cannot cast 
fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with 
GCSFileSystem, falling back to original "
               + "fsDataInputStream");
           inputStreamLocal = fsDataInputStream;
         }
       } else {
         // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
         // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
         LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
         inputStreamLocal = fsDataInputStream;
       }
   ```
   
   Output from the run:
   "HoodieLogFileReader 1111 
.0d7ba334-2847-4b24-997e-1dbecfd12e3b-0_20210306132835.log.1_0-55-75 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream" 
   
   So, what this essentially means is that fsDataInputStream.getWrappedStream() 
instanceof FSInputStream for GCSFileSystem. And the execution don't even go 
into the else block here which is our intention actually. 
   
   3rd variant: just to check if fsDataInputStream.getWrappedStream() is an 
instance of FSDataInputStream or FSInputStream
   
   ```
   if (FSUtils.isGCSFileSystem(fs)) {
         LOG.warn("HoodieLogFileReader 111 aaa " + logFile.getFileName() + " 
can_name: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()
                 + ". Is wrappedStream instance of fsDataInputStream " + 
(fsDataInputStream.getWrappedStream() instanceof FSDataInputStream)
                 + " , is wrappedSTream instance of fsInputStream " + 
(fsDataInputStream.getWrappedStream() instanceof FSInputStream));
         try {
           FSInputStream localFSInputStream = 
(FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream());
           inputStreamLocal = new SchemeAwareFSDataInputStream(new 
TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
                   new BufferedFSInputStream(localFSInputStream,bufferSize))), 
true);
           LOG.warn("HoodieLogFileReader 111 aaa succeeded " + 
logFile.getFileName());
         } catch (ClassCastException e) {
           LOG.warn("HoodieLogFileReader 111 bbb (aaa failed) " + 
logFile.getFileName() + " " + e.getCause()
                   + ", msg " + e.getMessage());
           // if we cannot cast  
fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, 
fallback to using as is
           LOG.warn("Cannot cast 
fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with 
GCSFileSystem, falling back to original "
                   + "fsDataInputStream");
           inputStreamLocal = fsDataInputStream;
         }
       } else if (fsDataInputStream.getWrappedStream() instanceof 
FSInputStream) {
         LOG.warn("HoodieLogFileReader 222 " + logFile.getFileName() + " " + 
fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
         inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
             new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
         LOG.warn("HoodieLogFileReader 222 completed ");
       } else {
         // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
         // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
         LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
         inputStreamLocal = fsDataInputStream;
       }
   ```
   
   Output from the run:
   "HoodieLogFileReader 111 aaa 
.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 can_name: 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. Is wrappedStream 
instance of fsDataInputStream false , is wrappedSTream instance of 
fsInputStream true" 
   
   "HoodieLogFileReader 111 bbb (aaa failed) 
.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 null, msg 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to 
org.apache.hadoop.fs.FSDataInputStream" 
   
   "Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to 
FSInputStream with GCSFileSystem, falling back to original fsDataInputStream" 
   .
   .
   After this, the seek ran into issue. 
   Caused by: java.io.EOFException: Invalid seek offset: position value (1584) 
must be between 0 and 1584 for 'gs://dataproc-staging-us-
   .
   .
   .
   
   
   So, since we encountered class cast exception, we don't leverage the 
SchemeAwareFSDataInputStream class at all and hence ran into seek issue. 
   
   4th variant: 
   my latest commit w/ the proposed fix.
   
   ```
   if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
         LOG.warn("HoodieLogFileReader 111 start " + logFile.getFileName());
         inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
                 new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
         LOG.warn("HoodieLogFileReader 111 completed ");
         if (FSUtils.isGCSFileSystem(fs)) {
           LOG.warn("HoodieLogFileReader 222 GCS. Wrapping with 
SchemeAwareFSDataInputStream");
           inputStreamLocal = new 
SchemeAwareFSDataInputStream(inputStreamLocal, true);
         }
       } else {
         // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
         // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
         LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
         inputStreamLocal = fsDataInputStream;
       }
   ```
   
   Output from the run:
   "HoodieLogFileReader 111 start 
.7a1a0684-b710-4a44-97c4-4c98b75db8a2-0_20210306142209.log.1_2-55-76" 
   "HoodieLogFileReader 111 completed "
   "HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream"
   
   // No exceptions. all good. 
   
   
   Summary: at some point, we came up w/ two conditions, where condition 1 
refers to fsDataInputStream.getWrappedStream() instanceof FSInputStream. and 
condition2 caters to GCSFileSystem. But in reality, GCSFileSystem also falls 
into first condition i.e. fsDataInputStream.getWrappedStream() instanceof 
FSInputStream. Hence the proposed fix. 
    
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to