Dan,

The first point you've raised is a known issue: When a job is stopped, the
unfinished part files are not transitioned to the finished state. This is
mentioned in the docs as Important Note 2 [1], and fixing this is waiting
on FLIP-46 [2]. That section of the docs also includes some S3-specific
warnings, but nothing pertaining to managing credentials. Perhaps [3] will
help.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials


On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <d...@netzooid.com> wrote:

> First, let me say, Flink is super cool - thanks everyone for making my
> life easier in a lot of ways! Wish I had this 10 years ago....
>
> Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
> Note that Flink is embedded in my app, not running as a standalone cluster.
>
> I am having a few problems, which I have illustrated in the small test
> case below.
>
> 1) After my job finishes, data never gets committed to S3. Looking through
> the code, I've noticed that data gets flushed to disk, but the multi-part
> upload is never finished. Even though my data doesn't hit the min part
> size, I would expect that if my job ends, my data should get uploaded since
> the job is 100% done.
>
> I am also having problems when the job is running not uploading - but I
> haven't been able to distill that down to a simple test case, so I thought
> I'd start here.
>
> 2) The S3 Filesystem does not pull credentials from the Flink
> Configuration when running in embedded mode. I have a workaround for this,
> but it is ugly. If you comment out the line in the test case which talks
> about this workaround, you will end up with a "Java.net.SocketException:
> Host is down"
>
> Can anyone shed light on these two issues? Thanks!
>
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import org.junit.jupiter.api.Test;
>
> public class S3Test {
>     @Test
>     public void whyDoesntThisWork() throws Exception {
>         Configuration configuration = new Configuration();
>         configuration.setString("state.backend",
> MemoryStateBackendFactory.class.getName());
>         configuration.setString("s3.access.key", "****");
>         configuration.setString("s3.secret.key", "****");
>
>         // If I don't do this, the S3 filesystem never gets the
> credentials
>         FileSystem.initialize(configuration, null);
>
>         LocalStreamEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>
>         StreamingFileSink<String> s3 = StreamingFileSink
>                 .forRowFormat(new Path("s3://bucket/"), new
> SimpleStringEncoder<String>())
>                 .build();
>
>         env.fromElements("string1", "string2")
>             .addSink(s3);
>
>         env.execute();
>
>         System.out.println("Done");
>     }
> }
>
>
> --
> Dan Diephouse
> @dandiep
>

Reply via email to