[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8543:
------------------------------
    Description: 
I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink<Tuple2<String, Object>>(path)
         .setWriter(writer)
         .setBucketer(new DateTimeBucketer<Tuple2<String, 
Object>>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

!Screen Shot 2018-01-30 at 18.34.51.png!   

The Flink console output is showing an exception being thrown by 
S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster and 
added some additional logging to the checkOpen() method to log the 'key' just 
before the exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
    private final OutputStream backupStream;
    private final File backupFile;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final String key;
    private final Progressable progress;
    private final S3AFileSystem fs;
    public static final Logger LOG = S3AFileSystem.LOG;

    public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
Progressable progress) throws IOException {
        this.key = key;
        this.progress = progress;
        this.fs = fs;
        this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
        LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
(Object)key, (Object)this.backupFile);
        this.backupStream = new BufferedOutputStream(new 
FileOutputStream(this.backupFile));
    }

    void checkOpen() throws IOException {
        if (!this.closed.get()) return;

        // vvvvvv-- Additional logging --vvvvvvv

        LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


        throw new IOException("Output Stream closed");
    }

    @Override
    public void flush() throws IOException {
        this.checkOpen();
        this.backupStream.flush();
    }

    @Override
    public void close() throws IOException {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.backupStream.close();
        LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
(Object)this.key);
        try {
            ObjectMetadata om = 
this.fs.newObjectMetadata(this.backupFile.length());
            Upload upload = 
this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
            ProgressableProgressListener listener = new 
ProgressableProgressListener(this.fs, this.key, upload, this.progress);
            upload.addProgressListener((ProgressListener)listener);
            upload.waitForUploadResult();
            listener.uploadCompleted();
            this.fs.finishedWrite(this.key);
        }
        catch (InterruptedException e) {
            throw (InterruptedIOException)new 
InterruptedIOException(e.toString()).initCause(e);
        }
        catch (AmazonClientException e) {
            throw S3AUtils.translateException("saving output", this.key, e);
        }
        finally {
            if (!this.backupFile.delete()) {
                LOG.warn("Could not delete temporary s3a file: {}", 
(Object)this.backupFile);
            }
            super.close();
        }
        LOG.debug("OutputStream for key '{}' upload complete", 
(Object)this.key);
    }

    @Override
    public void write(int b) throws IOException {
        this.checkOpen();
        this.backupStream.write(b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.checkOpen();
        this.backupStream.write(b, off, len);
    }

    static {
    }
}
{code}
 

You can see from this addition log output that the S3AOutputStream#close() 
method **appears** to be called before the S3AOutputStream#flush() method:

 
{code:java}
2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 128497 
bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem            
            - Finished write to 
landingzone/2018-02-01--1240/_part-0-0.in-progress
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem            
            - object_delete_requests += 1  ->  3

vvvvv- close() is called here? -vvvvv

2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem            
            
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
upload complete

vvvvv- flush() is called here? -vvvvv

2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem            
            
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
closed.

2018-02-01 12:42:21,212 INFO  org.apache.flink.runtime.taskmanager.Task         
            
- Attempting to fail task externally Source: Custom Source -> Map -> Sink: 
Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
2018-02-01 12:42:21,214 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Map -> Sink: Unnamed (1/2) 
(510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed}
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed
        at 
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
        at 
org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
        at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
        at java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
        at 
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
        at 
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
        at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
        at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
        ... 7 more
{code}

  was:
I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink<Tuple2<String, Object>>(path)
         .setWriter(writer)
         .setBucketer(new DateTimeBucketer<Tuple2<String, 
Object>>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

!Screen Shot 2018-01-30 at 18.34.51.png!   

I've grabbed the S3AOutputStream class from my cluster and added some 
additional logging to the checkOpen() method to log the 'key' just before the 
exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
    private final OutputStream backupStream;
    private final File backupFile;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final String key;
    private final Progressable progress;
    private final S3AFileSystem fs;
    public static final Logger LOG = S3AFileSystem.LOG;

    public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
Progressable progress) throws IOException {
        this.key = key;
        this.progress = progress;
        this.fs = fs;
        this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
        LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
(Object)key, (Object)this.backupFile);
        this.backupStream = new BufferedOutputStream(new 
FileOutputStream(this.backupFile));
    }

    void checkOpen() throws IOException {
        if (!this.closed.get()) return;

        // vvvvvv-- Additional logging --vvvvvvv

        LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


        throw new IOException("Output Stream closed");
    }

    @Override
    public void flush() throws IOException {
        this.checkOpen();
        this.backupStream.flush();
    }

    @Override
    public void close() throws IOException {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.backupStream.close();
        LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
(Object)this.key);
        try {
            ObjectMetadata om = 
this.fs.newObjectMetadata(this.backupFile.length());
            Upload upload = 
this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
            ProgressableProgressListener listener = new 
ProgressableProgressListener(this.fs, this.key, upload, this.progress);
            upload.addProgressListener((ProgressListener)listener);
            upload.waitForUploadResult();
            listener.uploadCompleted();
            this.fs.finishedWrite(this.key);
        }
        catch (InterruptedException e) {
            throw (InterruptedIOException)new 
InterruptedIOException(e.toString()).initCause(e);
        }
        catch (AmazonClientException e) {
            throw S3AUtils.translateException("saving output", this.key, e);
        }
        finally {
            if (!this.backupFile.delete()) {
                LOG.warn("Could not delete temporary s3a file: {}", 
(Object)this.backupFile);
            }
            super.close();
        }
        LOG.debug("OutputStream for key '{}' upload complete", 
(Object)this.key);
    }

    @Override
    public void write(int b) throws IOException {
        this.checkOpen();
        this.backupStream.write(b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.checkOpen();
        this.backupStream.write(b, off, len);
    }

    static {
    }
}
{code}
 

You can see from this addition log output that the S3AOutputStream#close() 
method **appears** to be called before the S3AOutputStream#flush() method:

 
{code:java}
2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 128497 
bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress   
            - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem            
            - Finished write to 
landingzone/2018-02-01--1240/_part-0-0.in-progress
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem            
            - object_delete_requests += 1  ->  3

vvvvv- close() is called here? -vvvvv

2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem            
            
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
upload complete

vvvvv- flush() is called here? -vvvvv

2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem            
            
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
closed.

2018-02-01 12:42:21,212 INFO  org.apache.flink.runtime.taskmanager.Task         
            
- Attempting to fail task externally Source: Custom Source -> Map -> Sink: 
Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
2018-02-01 12:42:21,214 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Map -> Sink: Unnamed (1/2) 
(510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed}
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed
        at 
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
        at 
org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
        at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
        at java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
        at 
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
        at 
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
        at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
        at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
        ... 7 more
{code}


> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --------------------------------------------------------------------------
>
>                 Key: FLINK-8543
>                 URL: https://issues.apache.org/jira/browse/FLINK-8543
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.4.0
>         Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>            Reporter: chris snow
>            Priority: Major
>         Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink<Tuple2<String, Object>>(path)
>          .setWriter(writer)
>          .setBucketer(new DateTimeBucketer<Tuple2<String, 
> Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
>     private final OutputStream backupStream;
>     private final File backupFile;
>     private final AtomicBoolean closed = new AtomicBoolean(false);
>     private final String key;
>     private final Progressable progress;
>     private final S3AFileSystem fs;
>     public static final Logger LOG = S3AFileSystem.LOG;
>     public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
>         this.key = key;
>         this.progress = progress;
>         this.fs = fs;
>         this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
>         LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
>         this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
>     }
>     void checkOpen() throws IOException {
>         if (!this.closed.get()) return;
>         // vvvvvv-- Additional logging --vvvvvvv
>         LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
>         throw new IOException("Output Stream closed");
>     }
>     @Override
>     public void flush() throws IOException {
>         this.checkOpen();
>         this.backupStream.flush();
>     }
>     @Override
>     public void close() throws IOException {
>         if (this.closed.getAndSet(true)) {
>             return;
>         }
>         this.backupStream.close();
>         LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
>         try {
>             ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
>             Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
>             ProgressableProgressListener listener = new 
> ProgressableProgressListener(this.fs, this.key, upload, this.progress);
>             upload.addProgressListener((ProgressListener)listener);
>             upload.waitForUploadResult();
>             listener.uploadCompleted();
>             this.fs.finishedWrite(this.key);
>         }
>         catch (InterruptedException e) {
>             throw (InterruptedIOException)new 
> InterruptedIOException(e.toString()).initCause(e);
>         }
>         catch (AmazonClientException e) {
>             throw S3AUtils.translateException("saving output", this.key, e);
>         }
>         finally {
>             if (!this.backupFile.delete()) {
>                 LOG.warn("Could not delete temporary s3a file: {}", 
> (Object)this.backupFile);
>             }
>             super.close();
>         }
>         LOG.debug("OutputStream for key '{}' upload complete", 
> (Object)this.key);
>     }
>     @Override
>     public void write(int b) throws IOException {
>         this.checkOpen();
>         this.backupStream.write(b);
>     }
>     @Override
>     public void write(byte[] b, int off, int len) throws IOException {
>         this.checkOpen();
>         this.backupStream.write(b, off, len);
>     }
>     static {
>     }
> }
> {code}
>  
> You can see from this addition log output that the S3AOutputStream#close() 
> method **appears** to be called before the S3AOutputStream#flush() method:
>  
> {code:java}
> 2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 
> 128497 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               - Finished write to 
> landingzone/2018-02-01--1240/_part-0-0.in-progress
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               - object_delete_requests += 1  ->  3
> vvvvv- close() is called here? -vvvvv
> 2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
> upload complete
> vvvvv- flush() is called here? -vvvvv
> 2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem          
>               
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
> closed.
> 2018-02-01 12:42:21,212 INFO  org.apache.flink.runtime.taskmanager.Task       
>               
> - Attempting to fail task externally Source: Custom Source -> Map -> Sink: 
> Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
> 2018-02-01 12:42:21,214 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom Source -> Map -> Sink: Unnamed (1/2) 
> (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
> TimerException{java.io.IOException: Output Stream closed}
>       at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Output Stream closed
>       at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
>       at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
>       at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>       at java.io.DataOutputStream.flush(DataOutputStream.java:123)
>       at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>       at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
>       at 
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
>       at 
> org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
>       at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
>       at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
>       at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
>       at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
>       at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
>       ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to