[jira] [Commented] (FLINK-8497) KafkaConsumer throws NPE if topic doesn't exist

2018-04-25 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16451826#comment-16451826
 ] 

chris snow commented on FLINK-8497:
---

Sorry for the late response.  Yes, please feel free to work on this ticket 
[~alexey.lesnov] 

> KafkaConsumer throws NPE if topic doesn't exist
> ---
>
> Key: FLINK-8497
> URL: https://issues.apache.org/jira/browse/FLINK-8497
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: chris snow
>Assignee: Aleksei Lesnov
>Priority: Minor
>
> If I accidentally set the kafka consumer with a topic that doesn't exist:
> {code:java}
> FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
>"does_not_exist",
> new JSONKeyValueDeserializationSchema(false),
> properties
> );
> DataStream input = env.addSource(kafkaConsumer);{code}
> Flink throws NPE
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748){code}
> Maybe Flink could through an IllegalStateException("Topic not found")?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8939) Provide better support for saving streaming data to s3

2018-03-25 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413404#comment-16413404
 ] 

chris snow commented on FLINK-8939:
---

Ah, that makes sense if this is related to eventual consistency.  I was using 
IBM Cloud Object Storage S3 which is immediately consistent.

> Provide better support for saving streaming data to s3
> --
>
> Key: FLINK-8939
> URL: https://issues.apache.org/jira/browse/FLINK-8939
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming Connectors
>Reporter: chris snow
>Priority: Major
> Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png
>
>
> Flink seems to struggle saving data to s3 due to the lack of a truncate 
> method, and in my test this resulted in lots of files with a .valid-length 
> suffix
> I’m using a bucketing sink:
> {code:java}
> return new BucketingSink>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer Object>>(formatString));{code}
>  
> !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png!
> See also, the discussion in the comments here: 
> https://issues.apache.org/jira/browse/FLINK-8543?filter=-2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8939) Provide better support for saving streaming data to s3

2018-03-25 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413170#comment-16413170
 ] 

chris snow commented on FLINK-8939:
---

I can’t recall seeing that issue @yanxiaobin

> Provide better support for saving streaming data to s3
> --
>
> Key: FLINK-8939
> URL: https://issues.apache.org/jira/browse/FLINK-8939
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming Connectors
>Reporter: chris snow
>Priority: Major
> Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png
>
>
> Flink seems to struggle saving data to s3 due to the lack of a truncate 
> method, and in my test this resulted in lots of files with a .valid-length 
> suffix
> I’m using a bucketing sink:
> {code:java}
> return new BucketingSink>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer Object>>(formatString));{code}
>  
> !18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png!
> See also, the discussion in the comments here: 
> https://issues.apache.org/jira/browse/FLINK-8543?filter=-2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8939) Provide better support for saving streaming data to s3

2018-03-14 Thread chris snow (JIRA)
chris snow created FLINK-8939:
-

 Summary: Provide better support for saving streaming data to s3
 Key: FLINK-8939
 URL: https://issues.apache.org/jira/browse/FLINK-8939
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: chris snow
 Attachments: 18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png

Flink seems to struggle saving data to s3 due to the lack of a truncate method, 
and in my test this resulted in lots of files with a .valid-length suffix

I’m using a bucketing sink:
{code:java}
return new BucketingSink>(path)
.setWriter(writer)
.setBucketer(new DateTimeBucketer>(formatString));{code}
 

!18652BC0-DD67-42C3-9A33-12F7BC10F9F3.png!

See also, the discussion in the comments here: 
https://issues.apache.org/jira/browse/FLINK-8543?filter=-2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-3588) Add a streaming (exactly-once) JDBC connector

2018-03-03 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384724#comment-16384724
 ] 

chris snow edited comment on FLINK-3588 at 3/3/18 4:38 PM:
---

This appears to have been implemented?
{code:java}
/**
 * An at-least-once Table sink for JDBC.
 *
 * The mechanisms of Flink guarantees delivering messages at-least-once to 
this sink (if
 * checkpointing is enabled). However, one common use case is to run idempotent 
queries
 * (e.g., REPLACE or INSERT OVERWRITE) to upsert into 
the database and
 * achieve exactly-once semantic.
 */
{code}

https://github.com/apache/flink/blob/release-1.4/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java


was (Author: snowch):
This appears to have been implemented?
{code:java}
/**
 * An at-least-once Table sink for JDBC.
 *
 * The mechanisms of Flink guarantees delivering messages at-least-once to 
this sink (if
 * checkpointing is enabled). However, one common use case is to run idempotent 
queries
 * (e.g., REPLACE or INSERT OVERWRITE) to upsert into 
the database and
 * achieve exactly-once semantic.
 */

https://github.com/apache/flink/blob/release-1.4/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
{code}

> Add a streaming (exactly-once) JDBC connector
> -
>
> Key: FLINK-3588
> URL: https://issues.apache.org/jira/browse/FLINK-3588
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3588) Add a streaming (exactly-once) JDBC connector

2018-03-03 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384724#comment-16384724
 ] 

chris snow commented on FLINK-3588:
---

This appears to have been implemented?
{code:java}
/**
 * An at-least-once Table sink for JDBC.
 *
 * The mechanisms of Flink guarantees delivering messages at-least-once to 
this sink (if
 * checkpointing is enabled). However, one common use case is to run idempotent 
queries
 * (e.g., REPLACE or INSERT OVERWRITE) to upsert into 
the database and
 * achieve exactly-once semantic.
 */

https://github.com/apache/flink/blob/release-1.4/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
{code}

> Add a streaming (exactly-once) JDBC connector
> -
>
> Key: FLINK-3588
> URL: https://issues.apache.org/jira/browse/FLINK-3588
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-27 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378211#comment-16378211
 ] 

chris snow commented on FLINK-8543:
---

Thanks for fixing this [~aljoscha]!
{quote}The comments are correct, S3 simply does not support truncate. The 
reason why we need it is that we need to trim files in case of recovering from 
a failure to get them back to the state they had when we did the last 
checkpoint. To ensure exactly-once semantics. The solution we came up with for 
filesystems that don't support truncate is the .valid-length files because we 
simply cannot take back what was already written but you're right that the 
client reading those files needs to understand them.

Coming up with a better solution will be one of the goals of the 1.6 release 
cycle, though.
{quote}
Is there a Jira ticket I can track to follow the changes that will happen in 
1.6 to provide better handing for s3 sinks?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output S

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-22 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372534#comment-16372534
 ] 

chris snow commented on FLINK-8543:
---

 

[~aljoscha] - what are your thoughts on this? 

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> ProgressableProgressListener listener = new 
> ProgressableP

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371882#comment-16371882
 ] 

chris snow commented on FLINK-8543:
---

So all files being accompanied by .valid-length is expected behavoir when 
saving to s3?

If so, unless client applications can understand and use the .valid-length 
(which I don’t think will be the case), I don’t think this functionality makes 
sense with s3?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(th

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371855#comment-16371855
 ] 

chris snow commented on FLINK-8543:
---

Does Flink running on EMR using S3 have the same issue?  If not, what AWS S3 
API calls and filesystem implementation are used?

 

IBM COS S3 supports a subset of the most common AWS S3 API operations 
(https://ibm-public-cos.github.io/crs-docs/api-reference#copy-an-object).

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newO

[jira] [Comment Edited] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371882#comment-16371882
 ] 

chris snow edited comment on FLINK-8543 at 2/21/18 7:42 PM:


So all files being accompanied by .valid-length is expected behavoir when 
saving to s3?

If so, unless client applications can understand and use the .valid-length 
(which I don’t think will be the case), I don’t think this functionality makes 
sense with s3?  I.e. am I trying to do something with Flink that it wasn’t 
designed to do?


was (Author: snowch):
So all files being accompanied by .valid-length is expected behavoir when 
saving to s3?

If so, unless client applications can understand and use the .valid-length 
(which I don’t think will be the case), I don’t think this functionality makes 
sense with s3?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOp

[jira] [Comment Edited] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371613#comment-16371613
 ] 

chris snow edited comment on FLINK-8543 at 2/21/18 4:59 PM:


My apologies, the commented out close() didn't get deployed due to a compile 
error including my own version of the class so I removed the close with a 
bytecode modifier, i.e.

*class*: org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter
{code}
public void close() throws IOException {
if (this.keyValueWriter != null) {
this.keyValueWriter.close();
}
}
{code}

It is running without exception now, however, every file in COS S3 still has 
the valid-length suffix:

{code}
...
transactions_load_20180221/2018-02-21--1454/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:10 
transactions_load_20180221/2018-02-21--1454/part-0-0   22.6 
KB  21/02/2018 14:56:06 
transactions_load_20180221/2018-02-21--1455/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:05 
transactions_load_20180221/2018-02-21--1455/part-0-0   14.5 
KB  21/02/2018 14:56:03
...
{code}

I configured log output to debug level on Bucketing sink and see this in the 
logs:

{code}
2018-02-21 16:45:18,993 DEBUG 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - Truncate is 
not supported.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:599)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:362)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Not implemented by the 
S3AFileSystem FileSystem implementation
at org.apache.hadoop.fs.FileSystem.truncate(FileSystem.java:1364)
... 15 more
{code}

Is this the reason for the .valid-length files?  If so, is this because the 
java code doesn't support Truncate or because the filesystem (IBM COS S3) 
doesn't support it? It looks as though it is not supported by my hadoop library:

*File*: 
./hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
{code}
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
getClass().getSimpleName() + " FileSystem implementation");
  }
{code}

and

{code}
$ grep 'truncate' 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
[[string not found]]
{code}



was (Author: snowch):
My apologies, the commented out close() didn't get deployed due to a compile 
error including my own version of the class so I removed the close with a 
bytecode modifier, i.e.

*class*: org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter
{code}
public void close() throws IOException {
if (this.keyValueWriter != null) {
this.keyValueWriter.close();
}
}
{code}

It is running without exception now, however, every file in COS S3 still has 
the valid-length suffix:

{code}
...
transactions_load_20180221/2018-02-21--1454/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:10 
transactions_load_20180221/2018-02-21--1454/part-0-0   22.6 
KB  21/02/2018 14:56:06 
transactions_load_20180221/2018-02-21--1455/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:05 
transactions_load_20180221/2018-02-21--1455/part-0-0   14.5 
KB  21/02/2018 14:56:03
...
{code}

I configured log output to deb

[jira] [Comment Edited] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371613#comment-16371613
 ] 

chris snow edited comment on FLINK-8543 at 2/21/18 4:54 PM:


My apologies, the commented out close() didn't get deployed due to a compile 
error including my own version of the class so I removed the close with a 
bytecode modifier, i.e.

*class*: org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter
{code}
public void close() throws IOException {
if (this.keyValueWriter != null) {
this.keyValueWriter.close();
}
}
{code}

It is running without exception now, however, every file in COS S3 still has 
the valid-length suffix:

{code}
...
transactions_load_20180221/2018-02-21--1454/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:10 
transactions_load_20180221/2018-02-21--1454/part-0-0   22.6 
KB  21/02/2018 14:56:06 
transactions_load_20180221/2018-02-21--1455/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:05 
transactions_load_20180221/2018-02-21--1455/part-0-0   14.5 
KB  21/02/2018 14:56:03
...
{code}

I configured log output to debug level on Bucketing sink and see this in the 
logs:

{code}
2018-02-21 16:45:18,993 DEBUG 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - Truncate is 
not supported.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:599)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:362)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Not implemented by the 
S3AFileSystem FileSystem implementation
at org.apache.hadoop.fs.FileSystem.truncate(FileSystem.java:1364)
... 15 more
{code}

Is this the reason for the .valid-length files?  If so, is this because the 
java code doesn't support Truncate or because the filesystem (IBM COS S3) 
doesn't support it? It looks as though it is not supported by my hadoop library:

*File*: 
./hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
{code}
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
getClass().getSimpleName() + " FileSystem implementation");
  }
{code}




was (Author: snowch):
My apologies, the commented out close() didn't get deployed due to a compile 
error including my own version of the class so I removed the close with a 
bytecode modifier, i.e.

*class*: org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter
{code}
public void close() throws IOException {
if (this.keyValueWriter != null) {
this.keyValueWriter.close();
}
}
{code}

It is running without exception now, however, every file in COS S3 still has 
the valid-length suffix:

{code}
...
transactions_load_20180221/2018-02-21--1454/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:10 
transactions_load_20180221/2018-02-21--1454/part-0-0   22.6 
KB  21/02/2018 14:56:06 
transactions_load_20180221/2018-02-21--1455/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:05 
transactions_load_20180221/2018-02-21--1455/part-0-0   14.5 
KB  21/02/2018 14:56:03
...
{code}

I configured log output to debug level on Bucketing sink and see this in the 
logs:

{code}
2018-02-21 16:45:18,993 DEBUG 
org.apache.flink.streaming.connectors.fs.bucketi

[jira] [Comment Edited] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371613#comment-16371613
 ] 

chris snow edited comment on FLINK-8543 at 2/21/18 4:48 PM:


My apologies, the commented out close() didn't get deployed due to a compile 
error including my own version of the class so I removed the close with a 
bytecode modifier, i.e.

*class*: org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter
{code}
public void close() throws IOException {
if (this.keyValueWriter != null) {
this.keyValueWriter.close();
}
}
{code}

It is running without exception now, however, every file in COS S3 still has 
the valid-length suffix:

{code}
...
transactions_load_20180221/2018-02-21--1454/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:10 
transactions_load_20180221/2018-02-21--1454/part-0-0   22.6 
KB  21/02/2018 14:56:06 
transactions_load_20180221/2018-02-21--1455/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:05 
transactions_load_20180221/2018-02-21--1455/part-0-0   14.5 
KB  21/02/2018 14:56:03
...
{code}

I configured log output to debug level on Bucketing sink and see this in the 
logs:

{code}
2018-02-21 16:45:18,993 DEBUG 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - Truncate is 
not supported.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:599)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:362)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Not implemented by the 
S3AFileSystem FileSystem implementation
at org.apache.hadoop.fs.FileSystem.truncate(FileSystem.java:1364)
... 15 more
{code}

Is this the reason for the .valid-length files?  If so, is this because the 
java code doesn't support Truncate or because the filesystem (IBM COS S3) 
doesn't support it?


was (Author: snowch):
My apologies, the commented out close() didn't get deployed due to a compile 
error including my own version of the class so I removed the close with a 
bytecode modifier, i.e.

*class*: org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter
{code}
public void close() throws IOException {
if (this.keyValueWriter != null) {
this.keyValueWriter.close();
}
}
{code}

It is running without exception now, however, every file in COS S3 still has 
the valid-length suffix:

{code}
...
transactions_load_20180221/2018-02-21--1454/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:10 
transactions_load_20180221/2018-02-21--1454/part-0-0   22.6 
KB  21/02/2018 14:56:06 
transactions_load_20180221/2018-02-21--1455/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:05 
transactions_load_20180221/2018-02-21--1455/part-0-0   14.5 
KB  21/02/2018 14:56:03
...
{code}

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonwork

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371613#comment-16371613
 ] 

chris snow commented on FLINK-8543:
---

My apologies, the commented out close() didn't get deployed due to a compile 
error including my own version of the class so I removed the close with a 
bytecode modifier, i.e.

*class*: org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter
{code}
public void close() throws IOException {
if (this.keyValueWriter != null) {
this.keyValueWriter.close();
}
}
{code}

It is running without exception now, however, every file in COS S3 still has 
the valid-length suffix:

{code}
...
transactions_load_20180221/2018-02-21--1454/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:10 
transactions_load_20180221/2018-02-21--1454/part-0-0   22.6 
KB  21/02/2018 14:56:06 
transactions_load_20180221/2018-02-21--1455/_part-0-0.valid-length 7.0 
bytes21/02/2018 14:56:05 
transactions_load_20180221/2018-02-21--1455/part-0-0   14.5 
KB  21/02/2018 14:56:03
...
{code}

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
>  

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371485#comment-16371485
 ] 

chris snow commented on FLINK-8543:
---

Unfortunately, commenting out the call to super.close() doesn't stop the 
exception:

{code}
@Override
public void close() throws IOException {
// See 
https://issues.apache.org/jira/browse/FLINK-8543?focusedCommentId=16371445&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16371445
//super.close(); //the order is important since super.close flushes 
inside
if (keyValueWriter != null) {
 keyValueWriter.close();
}
{code}

On the consistency note, hopefully, IBM COS S3 will be ok as it is immediately 
consistent?

> COS is ‘immediately consistent’ for data and ‘eventually consistent’ for 
> usage accounting.
Source: https://ibm-public-cos.github.io/crs-docs/faq

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @

[jira] [Comment Edited] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371403#comment-16371403
 ] 

chris snow edited comment on FLINK-8543 at 2/21/18 1:33 PM:


Sorry for the delay. I've added the debug statements:
{code:java}
public S3AOutputStream(Configuration conf,
  S3AFileSystem fs,
  String key,
  Progressable progress)
  throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;


backupFile = fs.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);

LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile);

this.backupStream = new BufferedOutputStream(
new FileOutputStream(backupFile));
  }


  // ** print extra debug output **

  void printStackTrace() {
long threadId = Thread.currentThread().getId();
StringBuilder sb = new StringBuilder();
sb.append("Thread id: " + Thread.currentThread().getId() + " key: " + key);
for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
  sb.append("\n " + ste);
}
// I'm being lazy - log the stacktrace as an error so it will get logged 
without having to 
// change the logger configuration 
LOG.error(sb.toString());
  }

  /**
   * Check for the filesystem being open.
   * @throws IOException if the filesystem is closed.
   */
  void checkOpen() throws IOException {
if (closed.get()) {
  printStackTrace();
  throw new IOException(
  "Output Stream closed.  Thread id: " + 
Thread.currentThread().getId() + " key: " + key);
}
  }

  @Override
  public void flush() throws IOException {
checkOpen();
backupStream.flush();
  }

  @Override
  public void close() throws IOException {

printStackTrace();

if (closed.getAndSet(true)) {
  return;
}

backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);

try {
  final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
  Upload upload = fs.putObject(
  fs.newPutObjectRequest(
  key,
  om,
  backupFile));
  ProgressableProgressListener listener =
  new ProgressableProgressListener(fs, key, upload, progress);
  upload.addProgressListener(listener);

  upload.waitForUploadResult();
  listener.uploadCompleted();
  // This will delete unnecessary fake parent directories
  fs.finishedWrite(key);
} catch (InterruptedException e) {
  throw (InterruptedIOException) new InterruptedIOException(e.toString())
  .initCause(e);
} catch (AmazonClientException e) {
  throw translateException("saving output", key , e);
} finally {
  if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
  }
  super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", key);
  }

  @Override
  public void write(int b) throws IOException {
checkOpen();
backupStream.write(b);
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException {
checkOpen();
backupStream.write(b, off, len);
  }

}
{code}
And here is one of the yarn logs:
{code:java}
Log Contents:
2018-02-21 12:43:11,323 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
- 

2018-02-21 12:43:11,326 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  Starting YARN TaskManager (Version: 1.4.0, Rev:3a9d9f2, 
Date:06.12.2017 @ 11:08:40 UTC)
2018-02-21 12:43:11,326 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  OS current user: clsadmin
2018-02-21 12:43:12,162 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  Current Hadoop/Kerberos user: clsadmin
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.8/25.112-b15
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  Maximum heap size: 406 MiBytes
2018-02-21 12:43:12,264 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  JAVA_HOME: /usr/jdk64/jdk1.8.0_112
2018-02-21 12:43:12,266 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  Hadoop version: 2.7.3.2.6.2.0-205
... 
omitted for brevity
... 
2018-02-21 12:43:12,267 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
- 
--

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371405#comment-16371405
 ] 

chris snow commented on FLINK-8543:
---

[~aljoscha] The failures seem to be happening for all files.   I'm running my 
code on cloud cluster that I provisioned just for this testing.  I'm happy to 
share credentials if you want to take a more detailed look.  My code is open 
source and the cluster will be destroyed after these tests, so there's nothing 
sensitive on it.

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
>

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-21 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371403#comment-16371403
 ] 

chris snow commented on FLINK-8543:
---

Sorry for the delay. I've added the debug statements:
{code:java}
public S3AOutputStream(Configuration conf,
  S3AFileSystem fs,
  String key,
  Progressable progress)
  throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;


backupFile = fs.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);

LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile);

this.backupStream = new BufferedOutputStream(
new FileOutputStream(backupFile));
  }


  // ** print extra debug output **

  void printStackTrace() {
long threadId = Thread.currentThread().getId();
StringBuilder sb = new StringBuilder();
sb.append("Thread id: " + Thread.currentThread().getId() + " key: " + key);
for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
  sb.append("\n " + ste);
}
// I'm being lazy - log the stacktrace as an error so it will get logged 
without having to 
// change the logger configuration 
LOG.error(sb.toString());
  }

  /**
   * Check for the filesystem being open.
   * @throws IOException if the filesystem is closed.
   */
  void checkOpen() throws IOException {
if (closed.get()) {
  printStackTrace();
  throw new IOException(
  "Output Stream closed.  Thread id: " + 
Thread.currentThread().getId() + " key: " + key);
}
  }

  @Override
  public void flush() throws IOException {
checkOpen();
backupStream.flush();
  }

  @Override
  public void close() throws IOException {

printStackTrace();

if (closed.getAndSet(true)) {
  return;
}

backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);

try {
  final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
  Upload upload = fs.putObject(
  fs.newPutObjectRequest(
  key,
  om,
  backupFile));
  ProgressableProgressListener listener =
  new ProgressableProgressListener(fs, key, upload, progress);
  upload.addProgressListener(listener);

  upload.waitForUploadResult();
  listener.uploadCompleted();
  // This will delete unnecessary fake parent directories
  fs.finishedWrite(key);
} catch (InterruptedException e) {
  throw (InterruptedIOException) new InterruptedIOException(e.toString())
  .initCause(e);
} catch (AmazonClientException e) {
  throw translateException("saving output", key , e);
} finally {
  if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
  }
  super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", key);
  }

  @Override
  public void write(int b) throws IOException {
checkOpen();
backupStream.write(b);
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException {
checkOpen();
backupStream.write(b, off, len);
  }

}
{code}
And here is one of the yarn logs:
{code:java}
Log Contents:
2018-02-21 12:43:11,323 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
- 

2018-02-21 12:43:11,326 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  Starting YARN TaskManager (Version: 1.4.0, Rev:3a9d9f2, 
Date:06.12.2017 @ 11:08:40 UTC)
2018-02-21 12:43:11,326 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  OS current user: clsadmin
2018-02-21 12:43:12,162 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  Current Hadoop/Kerberos user: clsadmin
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.8/25.112-b15
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  Maximum heap size: 406 MiBytes
2018-02-21 12:43:12,264 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  JAVA_HOME: /usr/jdk64/jdk1.8.0_112
2018-02-21 12:43:12,266 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
-  Hadoop version: 2.7.3.2.6.2.0-205
... 
omitted for brevity
... 
2018-02-21 12:43:12,267 INFO  org.apache.flink.yarn.YarnTaskManagerRunner   
- 

2018-02-21 12:43:12,270 INFO  

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-20 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370123#comment-16370123
 ] 

chris snow commented on FLINK-8543:
---

[~aljoscha] - I’m using AvroKeyValueSinkWriter 
[https://github.com/ibm-cloud-streaming-retail-demo/flink-on-iae-messagehub-to-s3/blob/master/src/main/java/com/ibm/cloud/flink/StreamingJob.java#L186]

 

[~ste...@apache.org] - HDP 2.6.2: 
https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction

 

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> 

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-14 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364895#comment-16364895
 ] 

chris snow commented on FLINK-8543:
---

I didn’t see any errors or suspicious entries in the logs prior to this error.

I’ll try running again in a few days with the extra stacktrace logging.

Thanks, [~aljoscha]

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = 
> this.fs.putObject(this.fs.newP

[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-11 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359852#comment-16359852
 ] 

chris snow commented on FLINK-8543:
---

I'm hoping that I can get access to an internal cluster that will give me root 
access and hence more debugging capabilities.  I’m thinking of adding some code 
to print out the stacktrace and the thread ID from the flush() and close() 
methods.

Are there any other areas that you would like me to investigate?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> 

[jira] [Created] (FLINK-8618) Add S3 to the list of sinks on the delivery guarantees page

2018-02-09 Thread chris snow (JIRA)
chris snow created FLINK-8618:
-

 Summary: Add S3 to the list of sinks on the delivery guarantees 
page
 Key: FLINK-8618
 URL: https://issues.apache.org/jira/browse/FLINK-8618
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: chris snow


It would be good to add S3 to the list of sinks.

Maybe S3 inherits the delivery guarantee properties from hdfs in which case it 
could be added next to hdfs? E.g.

HDFS/S3 rolling sink | exactly once | Implementation depends on Hadoop version

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-07 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356524#comment-16356524
 ] 

chris snow commented on FLINK-8543:
---

One thing to note - I've used the standard Flink 1.4.0 download for hadoop 2.7 
but I am running on a Hortonworks based hadoop environment.  I'm not sure what 
impact this may have, but I thought it was worth mentioning in case this may 
have an impact.

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.back

[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-01 Thread chris snow (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8543:
--
Description: 
I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink>(path)
 .setWriter(writer)
 .setBucketer(new DateTimeBucketer>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

!Screen Shot 2018-01-30 at 18.34.51.png!   

The Flink console output is showing an exception being thrown by 
S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster and 
added some additional logging to the checkOpen() method to log the 'key' just 
before the exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
private final OutputStream backupStream;
private final File backupFile;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String key;
private final Progressable progress;
private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;

public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
Progressable progress) throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
(Object)key, (Object)this.backupFile);
this.backupStream = new BufferedOutputStream(new 
FileOutputStream(this.backupFile));
}

void checkOpen() throws IOException {
if (!this.closed.get()) return;

// vv-- Additional logging --vvv

LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


throw new IOException("Output Stream closed");
}

@Override
public void flush() throws IOException {
this.checkOpen();
this.backupStream.flush();
}

@Override
public void close() throws IOException {
if (this.closed.getAndSet(true)) {
return;
}
this.backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
(Object)this.key);
try {
ObjectMetadata om = 
this.fs.newObjectMetadata(this.backupFile.length());
Upload upload = 
this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
ProgressableProgressListener listener = new 
ProgressableProgressListener(this.fs, this.key, upload, this.progress);
upload.addProgressListener((ProgressListener)listener);
upload.waitForUploadResult();
listener.uploadCompleted();
this.fs.finishedWrite(this.key);
}
catch (InterruptedException e) {
throw (InterruptedIOException)new 
InterruptedIOException(e.toString()).initCause(e);
}
catch (AmazonClientException e) {
throw S3AUtils.translateException("saving output", this.key, e);
}
finally {
if (!this.backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", 
(Object)this.backupFile);
}
super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", 
(Object)this.key);
}

@Override
public void write(int b) throws IOException {
this.checkOpen();
this.backupStream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
this.checkOpen();
this.backupStream.write(b, off, len);
}

static {
}
}
{code}
 

You can see from this addition log output that the S3AOutputStream#close() 
method **appears** to be called before the S3AOutputStream#flush() method:

 
{code:java}
2018-02-01 1

[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-01 Thread chris snow (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8543:
--
 Attachment: Screen Shot 2018-01-30 at 18.34.51.png
Description: 
I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink>(path)
 .setWriter(writer)
 .setBucketer(new DateTimeBucketer>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

!Screen Shot 2018-01-30 at 18.34.51.png!   

I've grabbed the S3AOutputStream class from my cluster and added some 
additional logging to the checkOpen() method to log the 'key' just before the 
exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
private final OutputStream backupStream;
private final File backupFile;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String key;
private final Progressable progress;
private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;

public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
Progressable progress) throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
(Object)key, (Object)this.backupFile);
this.backupStream = new BufferedOutputStream(new 
FileOutputStream(this.backupFile));
}

void checkOpen() throws IOException {
if (!this.closed.get()) return;

// vv-- Additional logging --vvv

LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


throw new IOException("Output Stream closed");
}

@Override
public void flush() throws IOException {
this.checkOpen();
this.backupStream.flush();
}

@Override
public void close() throws IOException {
if (this.closed.getAndSet(true)) {
return;
}
this.backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
(Object)this.key);
try {
ObjectMetadata om = 
this.fs.newObjectMetadata(this.backupFile.length());
Upload upload = 
this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
ProgressableProgressListener listener = new 
ProgressableProgressListener(this.fs, this.key, upload, this.progress);
upload.addProgressListener((ProgressListener)listener);
upload.waitForUploadResult();
listener.uploadCompleted();
this.fs.finishedWrite(this.key);
}
catch (InterruptedException e) {
throw (InterruptedIOException)new 
InterruptedIOException(e.toString()).initCause(e);
}
catch (AmazonClientException e) {
throw S3AUtils.translateException("saving output", this.key, e);
}
finally {
if (!this.backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", 
(Object)this.backupFile);
}
super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", 
(Object)this.key);
}

@Override
public void write(int b) throws IOException {
this.checkOpen();
this.backupStream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
this.checkOpen();
this.backupStream.write(b, off, len);
}

static {
}
}
{code}
 

You can see from this addition log output that the S3AOutputStream#close() 
method **appears** to be called before the S3AOutputStream#flush() method:

 
{code:java}
2018-02-01 12:42:20,698 DEBUG org.apache.h

[jira] [Created] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-01 Thread chris snow (JIRA)
chris snow created FLINK-8543:
-

 Summary: Output Stream closed at 
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
 Key: FLINK-8543
 URL: https://issues.apache.org/jira/browse/FLINK-8543
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.4.0
 Environment: IBM Analytics Engine - 
[https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]

The cluster is based on Hortonworks Data Platform 2.6.2. The following 
components are made available.

Apache Spark 2.1.1 Hadoop 2.7.3
Apache Livy 0.3.0
Knox 0.12.0
Ambari 2.5.2
Anaconda with Python 2.7.13 and 3.5.2 
Jupyter Enterprise Gateway 0.5.0 
HBase 1.1.2 * 
Hive 1.2.1 *
Oozie 4.2.0 *
Flume 1.5.2 * 
Tez 0.7.0 * 
Pig 0.16.0 * 
Sqoop 1.4.6 * 
Slider 0.92.0 * 
Reporter: chris snow


I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink>(path)
 .setWriter(writer)
 .setBucketer(new DateTimeBucketer>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

  !Screen Shot 2018-01-30 at 18.34.51.png!

I've grabbed the S3AOutputStream class from my cluster and added some 
additional logging to the checkOpen() method to log the 'key' just before the 
exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
private final OutputStream backupStream;
private final File backupFile;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String key;
private final Progressable progress;
private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;

public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
Progressable progress) throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
(Object)key, (Object)this.backupFile);
this.backupStream = new BufferedOutputStream(new 
FileOutputStream(this.backupFile));
}

void checkOpen() throws IOException {
if (!this.closed.get()) return;

// vv-- Additional logging --vvv

LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


throw new IOException("Output Stream closed");
}

@Override
public void flush() throws IOException {
this.checkOpen();
this.backupStream.flush();
}

@Override
public void close() throws IOException {
if (this.closed.getAndSet(true)) {
return;
}
this.backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
(Object)this.key);
try {
ObjectMetadata om = 
this.fs.newObjectMetadata(this.backupFile.length());
Upload upload = 
this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
ProgressableProgressListener listener = new 
ProgressableProgressListener(this.fs, this.key, upload, this.progress);
upload.addProgressListener((ProgressListener)listener);
upload.waitForUploadResult();
listener.uploadCompleted();
this.fs.finishedWrite(this.key);
}
catch (InterruptedException e) {
throw (InterruptedIOException)new 
InterruptedIOException(e.toString()).initCause(e);
}
catch (AmazonClientException e) {
throw S3AUtils.translateException("saving output", this.key, e);
}
finally {
if (!this.backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", 
(Object)this.backupFile);
}
 

[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2018-01-25 Thread chris snow (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8513:
--
Description: 
It would be useful if the documentation provided information on connecting to 
non-AWS S3 endpoints when using presto.  For example:

 

You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
{{flink-conf.yaml}}:
{code:java}
s3.access-key: your-access-key 
s3.secret-key: your-secret-key{code}
If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
endpoint in Flink's {{flink-conf.yaml}}:
{code:java}
s3.endpoint: your-endpoint-hostname{code}

 

Source: [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md]

 

  was:
It would be useful if the documentation provided information on connecting to 
non-AWS S3 endpoints when using presto.  For example:

 

You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
{{flink-conf.yaml}}:

{{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++

If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
endpoint in Flink's {{flink-conf.yaml}}:

{{s3.endpoint: your-endpoint-hostname }}

 

Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md

 


> Add documentation for connecting to non-AWS S3 endpoints
> 
>
> Key: FLINK-8513
> URL: https://issues.apache.org/jira/browse/FLINK-8513
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: chris snow
>Priority: Trivial
>
> It would be useful if the documentation provided information on connecting to 
> non-AWS S3 endpoints when using presto.  For example:
>  
> 
> You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
> {{flink-conf.yaml}}:
> {code:java}
> s3.access-key: your-access-key 
> s3.secret-key: your-secret-key{code}
> If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
> Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
> endpoint in Flink's {{flink-conf.yaml}}:
> {code:java}
> s3.endpoint: your-endpoint-hostname{code}
> 
>  
> Source: 
> [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2018-01-25 Thread chris snow (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chris snow updated FLINK-8513:
--
Description: 
It would be useful if the documentation provided information on connecting to 
non-AWS S3 endpoints when using presto.  For example:

 

You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
{{flink-conf.yaml}}:

{{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++

If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
endpoint in Flink's {{flink-conf.yaml}}:

{{s3.endpoint: your-endpoint-hostname }}

 

Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md

 

  was:It would be useful if the documentation provided information on 
connecting to non-AWS S3 endpoints when using presto.


> Add documentation for connecting to non-AWS S3 endpoints
> 
>
> Key: FLINK-8513
> URL: https://issues.apache.org/jira/browse/FLINK-8513
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: chris snow
>Priority: Trivial
>
> It would be useful if the documentation provided information on connecting to 
> non-AWS S3 endpoints when using presto.  For example:
>  
> 
> You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
> {{flink-conf.yaml}}:
> {{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++
> If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
> Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
> endpoint in Flink's {{flink-conf.yaml}}:
> {{s3.endpoint: your-endpoint-hostname }}
> 
>  
> Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2018-01-25 Thread chris snow (JIRA)
chris snow created FLINK-8513:
-

 Summary: Add documentation for connecting to non-AWS S3 endpoints
 Key: FLINK-8513
 URL: https://issues.apache.org/jira/browse/FLINK-8513
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: chris snow


It would be useful if the documentation provided information on connecting to 
non-AWS S3 endpoints when using presto.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8497) KafkaConsumer throws NPE if topic doesn't exist

2018-01-23 Thread chris snow (JIRA)
chris snow created FLINK-8497:
-

 Summary: KafkaConsumer throws NPE if topic doesn't exist
 Key: FLINK-8497
 URL: https://issues.apache.org/jira/browse/FLINK-8497
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.0
Reporter: chris snow


If I accidentally set the kafka consumer with a topic that doesn't exist:
{code:java}
FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
   "does_not_exist",
new JSONKeyValueDeserializationSchema(false),
properties
);
DataStream input = env.addSource(kafkaConsumer);{code}
Flink throws NPE
{code:java}
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748){code}
Maybe Flink could through an IllegalStateException("Topic not found")?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)