Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-07 Thread Dan Hill
I was able to get finer grained logs showing.  I switched from
-Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
larger test case, I was hitting a silent log4j error.  When I created a
small test case to just test logging, I received a log4j error.

Here is a tar

with the info logs for:
- (test-nojoin.log) this one works as expected
- (test-join.log) this does not work as expected

I don't see an obvious issue just by scanning the logs.  I'll take a deeper
in 9 hours.




On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:

> Switching to junit4 did not help.
>
> If I make a request to the url returned from
> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
> I get
> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>
>
>
>
> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:
>
>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
>> reference.  However, the actual log calls are not printing to the console.
>> Only errors appear in my terminal window and the test logs.  Maybe console
>> logger does not work for this junit setup.  I'll see if the file version
>> works.
>>
>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> What Aljoscha suggested is what works for us!
>>>
>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
>>> wrote:
>>>
 Hi Dan,

 to make the log properties file work this should do it: assuming the
 log4j.properties is in //src/main/resources. You will need a
 BUILD.bazel
 in that directory that has only the line
 "exports_files(["log4j.properties"]). Then you can reference it in your
 test via "resources = ["//src/main/resources:log4j.properties"],". Of
 course you also need to have the right log4j deps (or slf4j if you're
 using that)

 Hope that helps!

 Aljoscha

 On 07.10.20 00:41, Dan Hill wrote:
 > I'm trying to use Table API for my job.  I'll soon try to get a test
 > working for my stream job.
 > - I'll parameterize so I can have different sources and sink for
 tests.
 > How should I mock out a Kafka source?  For my test, I was planning on
 > changing the input to be from a temp file (instead of Kafka).
 > - What's a good way of forcing a watermark using the Table API?
 >
 >
 > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 
 wrote:
 >
 >> Thanks!
 >>
 >> Great to know.  I copied this junit5-jupiter-starter-bazel
 >> <
 https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
 rule
 >> into my repository (I don't think junit5 is supported directly with
 >> java_test yet).  I tried a few ways of bundling `log4j.properties`
 into the
 >> jar and didn't get them to work.  My current iteration hacks the
 >> log4j.properties file as an absolute path.  My failed attempts would
 spit
 >> an error saying log4j.properties file was not found.  This route
 finds it
 >> but the log properties are not used for the java logger.
 >>
 >> Are there a better set of rules to use for junit5?
 >>
 >> # build rule
 >> java_junit5_test(
 >>  name = "tests",
 >>  srcs = glob(["*.java"]),
 >>  test_package = "ai.promoted.logprocessor.batch",
 >>  deps = [...],
 >>  jvm_flags =
 >>
 ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
 >> )
 >>
 >> # log4j.properties
 >> status = error
 >> name = Log4j2PropertiesConfig
 >> appenders = console
 >> appender.console.type = Console
 >> appender.console.name = LogToConsole
 >> appender.console.layout.type = PatternLayout
 >> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
 >> rootLogger.level = info
 >> rootLogger.appenderRefs = stdout
 >> rootLogger.appenderRef.stdout.ref = LogToConsole
 >>
 >> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
 >> austin.caw...@gmail.com> wrote:
 >>
 >>> Oops, this is actually the JOIN issue thread [1]. Guess I should
 revise
 >>> my previous "haven't had issues" statement hah. Sorry for the spam!
 >>>
 >>> [1]:
 >>>
 apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
 >>>
 >>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
 >>> austin.caw...@gmail.com> wrote:
 >>>
  Unless it's related to this issue[1], which was w/ my JOIN and time
  characteristics, though not sure that applies for batch.
 
  Best,
  Austin
 
  [1]:
 
 apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Du

Re: Stateful Functions + ML model prediction

2020-10-07 Thread Tzu-Li (Gordon) Tai
Hi John,

Thanks a lot for opening the JIRA ticket! If you are interested in
contributing that to StateFun, I'm also happy to guide you with the
contribution.

On Mon, Oct 5, 2020 at 10:24 PM John Morrow 
wrote:

> Thanks for the response Gordon, and that FlinkForward presentation - it's
> been very helpful.
>
> I put in a JIRA ticket for it:
> https://issues.apache.org/jira/browse/FLINK-19507
>
> I did find this page:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html
> and there are source/sink connectors for Pulsar (
> https://github.com/streamnative/pulsar-flink) - I'm guessing that's how I
> should approach using Pulsar as an ingress/egress?
>

That is correct! The `SourceFunctionSpec` and `SinkFunctionSpec` are a
means for users to bridge existing Flink sources and sinks to StateFun
ingress / egress.

The downside to that approach, is that even if you're purely using remote
functions, you'd still have to provide an embedded module to add ingresses
/ egresses this way.
Eventually it would be best (if we have several users requesting Pulsar) to
have native support like Kinesis and Kafka so that users can define them
textually in `module.yaml` definition files, but this approach you pointed
definitely works for the time being.

Cheers,
Gordon


>
> Cheers,
> John.
>
> --
> *From:* Tzu-Li (Gordon) Tai 
> *Sent:* Monday 5 October 2020 03:21
> *To:* John Morrow ; user  >
> *Subject:* Re: Stateful Functions + ML model prediction
>
> Hi John,
>
> It is definitely possible to use Apache Pulsar with StateFun. Could you
> open a JIRA ticket for that?
> It would be nice to see how much interest we can gather on adding that as
> a new IO module, and consider adding native support for Pulsar in future
> releases.
>
> If you are already using StateFun and want to start using Pulsar as an
> ingress/egress already for current versions, there's also a way to do that
> right now.
> If that's the case, please let me know and I'll try to provide some
> guidelines on how to achieve that.
>
> Cheers,
> Gordon
>
>
> On Fri, Oct 2, 2020, 1:38 AM John Morrow 
> wrote:
>
> Hi Flink Users,
>
> I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward (
> https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka &
> Kinesis are supported, and looking at
> https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO
> packages for those two: statefun-kafka-io & statefun-kinesis-io
>
>
> Is it possible to use Apache Pulsar as a Statefun ingress & egress?
>
> Thanks,
> John.
>
> --
> *From:* John Morrow 
> *Sent:* Wednesday 23 September 2020 11:37
> *To:* Igal Shilman 
> *Cc:* user 
> *Subject:* Re: Stateful Functions + ML model prediction
>
> Thanks very much Igal - that sounds like a good solution!
>
> I'm new to StateFun so I'll have to dig into it a bit more, but this
> sounds like a good direction.
>
> Thanks again,
> John.
>
> --
> *From:* Igal Shilman 
> *Sent:* Wednesday 23 September 2020 09:06
> *To:* John Morrow 
> *Cc:* user 
> *Subject:* Re: Stateful Functions + ML model prediction
>
> Hi John,
>
> Thank you for sharing your interesting use case!
>
> Let me start from your second question:
>
> Are stateful functions available to all Flink jobs within a cluster?
>
>
> Yes, the remote functions are some logic exposed behind an HTTP endpoint,
> and Flink would forward any message addressed to them via an HTTP request.
> The way StateFun works is, for every invocation, StateFun would attach the
> necessary context (any previous state for a key, and the message) to the
> HTTP request.
> So practically speaking the same remote function can be contacted by
> different Jobs, as the remote functions are effectively stateless.
>
>  Does this sound like a good use case for stateful functions?
>
>
> The way I would approach this is, I would consider moving the
> business rules and the enrichment to the remote function.
> This would:
>
> a) Eliminate the need for a broadcast stream, you can simply deploy a new
> version of the remote function container, as they can be independy
> restarted (without the need to restart the Flink job that contacts them)
> b) You can perform the enrichment immediately without going through
> an RichAsyncFunction, as StateFun, by default, invokes many remote
> functions in parallel (but never for the same key)
> c) You can contact the remote service that hosts the machine learning
> model, or even load the model in the remote function's process on startup.
>
> So, in kubernetes terms:
>
> 1. You would need a set of pods (a deployment) that are able to serve HTTP
> traffic and expose a StateFun endpoint.
> 2. You would need a separate deployment for Flink that runs a StateFun job
> 3. The StateFun job would need to know how to contact these pods, so you
> would also need a kubernetes service (or a LoadBalancer) that
> balances the req

Re: Statefun + Confluent Fully-managed Kafka

2020-10-07 Thread Tzu-Li (Gordon) Tai
Hi Hezekiah,

I've confirmed that the Kafka properties set in the module specification
file (module.yaml) are indeed correctly being parsed and used to construct
the internal Kafka clients.
StateFun / Flink does not alter or modify the properties.

So, this should be something wrong with your property settings, and causing
the Kafka client itself to not pick up the `sasl.jaas.config` property
value.
>From the resolved producer config in the logs, it looks like your
`sasl.jaas.config` is null, but all other properties are being picked up
correctly.

Please check your properties again, and make sure their keys are correct
and values conform to the JAAS config formats.
For starters, there's a typo in your `sasl.mechanism` config, you've
mis-typed an extra 's'.

I've verified that the following properties will work, with SASL JAAS
config being picked up correctly:

```
egresses:
  - egress:
  meta:
type: statefun.kafka.io/generic-egress
id: example/greets
  spec:
address: 
deliverySemantic:
  type: exactly-once
  transactionTimeoutMillis: 10
properties:
  - security.protocol: SASL_SSL
  - sasl.mechanism: PLAIN
  - sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
  - ssl.endpoint.identification.algorithm: https
```

Cheers,
Gordon

On Wed, Oct 7, 2020 at 11:36 PM Till Rohrmann  wrote:

> Hi Hezekiah, thanks for reporting this issue. I am pulling Gordon and Igal
> in who might be able to help you with this problem.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 3:56 PM hezekiah maina 
> wrote:
>
>> Hi,
>>
>> I'm trying to use Stateful Functions with Kafka as my ingress and egress.
>> I'm using the Confluent fully-managed Kafka and I'm having a challenge
>> adding my authentication details in the module.yaml file.
>> Here is my current config details:
>> version: "1.0"
>> module:
>>   meta:
>> type: remote
>>   spec:
>> functions:
>>   - function:
>>   meta:
>> kind: http
>> type: example/greeter
>>   spec:
>> endpoint: 
>> states:
>>   - seen_count
>> maxNumBatchRequests: 500
>> timeout: 2min
>> ingresses:
>>   - ingress:
>>   meta:
>> type: statefun.kafka.io/routable-protobuf-ingress
>> id: example/names
>>   spec:
>> address: 
>> consumerGroupId: statefun-consumer-group
>> topics:
>>   - topic: names
>> typeUrl: com.googleapis/example.GreetRequest
>> targets:
>>   - example/greeter
>> properties:
>>   - bootstrap.servers:
>>   - security.protocol: SASL_SSL
>>   - sasl.mechanism: PLAIN
>>   - sasl.jaas.config:
>> org.apache.kafka.common.security.plain.PlainLoginModule required
>> username="USERNAME" password="PASSWORD";
>>   - ssl.endpoint.identification.algorithm: https
>> egresses:
>>   - egress:
>>   meta:
>> type: statefun.kafka.io/generic-egress
>> id: example/greets
>>   spec:
>> address: 
>> deliverySemantic:
>>   type: exactly-once
>>   transactionTimeoutMillis: 10
>> properties:
>>   - bootstrap.servers: 
>>   - security.protocol: SASL_SSL
>>   - sasl.mechanisms: PLAIN
>>   - sasl.jaas.config:
>> org.apache.kafka.common.security.plain.PlainLoginModule required
>> username="USERNAME" password="PASSWORD";
>>   - ssl.endpoint.identification.algorithm: https
>>
>> After running docker-compose with a master and worker containers I'm
>> getting this error:
>> Could not find a 'KafkaClient' entry in the JAAS configuration. System
>> property 'java.security.auth.login.config' is
>> /tmp/jaas-2846080966990890307.conf
>>
>> The producer config logged :
>> worker_1  | 2020-10-07 13:38:08,489 INFO
>>  org.apache.kafka.clients.producer.ProducerConfig  -
>> ProducerConfig values:
>> worker_1  | acks = 1
>> worker_1  | batch.size = 16384
>> worker_1  | bootstrap.servers = [https://
>> ---.asia-southeast1.gcp.confluent.cloud:9092]
>> worker_1  | buffer.memory = 33554432
>> worker_1  | client.dns.lookup = default
>> worker_1  | client.id =
>> worker_1  | compression.type = none
>> worker_1  | connections.max.idle.ms = 54
>> worker_1  | delivery.timeout.ms = 12
>> worker_1  | enable.idempotence = false
>> worker_1  | interceptor.classes = []
>> worker_1  | key.serializer = class
>> org.apache.kafka.common.serialization.ByteArraySerializer
>> worker_1  | linger.ms = 0
>> worker_1  | max.block.ms = 6
>> worker_1  | max.

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-07 Thread Dan Hill
Switching to junit4 did not help.

If I make a request to the url returned from
MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
I get
{"errors":["Not found."]}.  I'm not sure if this is intentional.




On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:

> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
> reference.  However, the actual log calls are not printing to the console.
> Only errors appear in my terminal window and the test logs.  Maybe console
> logger does not work for this junit setup.  I'll see if the file version
> works.
>
> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> What Aljoscha suggested is what works for us!
>>
>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> to make the log properties file work this should do it: assuming the
>>> log4j.properties is in //src/main/resources. You will need a BUILD.bazel
>>> in that directory that has only the line
>>> "exports_files(["log4j.properties"]). Then you can reference it in your
>>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>>> course you also need to have the right log4j deps (or slf4j if you're
>>> using that)
>>>
>>> Hope that helps!
>>>
>>> Aljoscha
>>>
>>> On 07.10.20 00:41, Dan Hill wrote:
>>> > I'm trying to use Table API for my job.  I'll soon try to get a test
>>> > working for my stream job.
>>> > - I'll parameterize so I can have different sources and sink for tests.
>>> > How should I mock out a Kafka source?  For my test, I was planning on
>>> > changing the input to be from a temp file (instead of Kafka).
>>> > - What's a good way of forcing a watermark using the Table API?
>>> >
>>> >
>>> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill  wrote:
>>> >
>>> >> Thanks!
>>> >>
>>> >> Great to know.  I copied this junit5-jupiter-starter-bazel
>>> >> <
>>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>>> rule
>>> >> into my repository (I don't think junit5 is supported directly with
>>> >> java_test yet).  I tried a few ways of bundling `log4j.properties`
>>> into the
>>> >> jar and didn't get them to work.  My current iteration hacks the
>>> >> log4j.properties file as an absolute path.  My failed attempts would
>>> spit
>>> >> an error saying log4j.properties file was not found.  This route
>>> finds it
>>> >> but the log properties are not used for the java logger.
>>> >>
>>> >> Are there a better set of rules to use for junit5?
>>> >>
>>> >> # build rule
>>> >> java_junit5_test(
>>> >>  name = "tests",
>>> >>  srcs = glob(["*.java"]),
>>> >>  test_package = "ai.promoted.logprocessor.batch",
>>> >>  deps = [...],
>>> >>  jvm_flags =
>>> >>
>>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>>> >> )
>>> >>
>>> >> # log4j.properties
>>> >> status = error
>>> >> name = Log4j2PropertiesConfig
>>> >> appenders = console
>>> >> appender.console.type = Console
>>> >> appender.console.name = LogToConsole
>>> >> appender.console.layout.type = PatternLayout
>>> >> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>>> >> rootLogger.level = info
>>> >> rootLogger.appenderRefs = stdout
>>> >> rootLogger.appenderRef.stdout.ref = LogToConsole
>>> >>
>>> >> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>>> >> austin.caw...@gmail.com> wrote:
>>> >>
>>> >>> Oops, this is actually the JOIN issue thread [1]. Guess I should
>>> revise
>>> >>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>> >>>
>>> >>> [1]:
>>> >>>
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>> >>>
>>> >>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>> >>> austin.caw...@gmail.com> wrote:
>>> >>>
>>>  Unless it's related to this issue[1], which was w/ my JOIN and time
>>>  characteristics, though not sure that applies for batch.
>>> 
>>>  Best,
>>>  Austin
>>> 
>>>  [1]:
>>> 
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>> 
>>> 
>>>  On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>  austin.caw...@gmail.com> wrote:
>>> 
>>> > Hey Dan,
>>> >
>>> > We use Junit5 and Bazel to run Flink SQL tests on a mini cluster
>>> and
>>> > haven’t had issues, though we’re only testing on streaming jobs.
>>> >
>>> > Happy to help setting up logging with that if you’d like.
>>> >
>>> > Best,
>>> > Austin
>>> >
>>> > On Tue, Oct 6, 2020 at 6:02 PM Dan Hill 
>>> wrote:
>>> >
>>> >> I don't think any of the gotchas apply to me (at the bottom of
>>> this
>>> >> link).
>>> >>
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-minicluster

Network issue leading to "No pooled slot available"

2020-10-07 Thread Dan Diephouse
I am now using the S3 StreamingFileSink to send data to an S3 bucket.
If/when the network connection has issues, it seems to put Flink into an
irrecoverable state. Am I understanding this correctly? Any suggestions on
how to troubleshoot / fix?

Here is what I'm observing:

*1. Network is dropped *

*2. S3 connections do not exit gracefully*

2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
not react to cancelling signal for 30 seconds, but is stuck in method:
 java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
java.base@14.0.2
/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
java.base@14.0.2
/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
java.base@14.0.2
/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
java.base@14.0.2/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
java.base@14.0.2
/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
java.base@14.0.2
/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
java.base@14.0.2
/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
java.base@14.0.2
/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
java.base@14.0.2
/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)

*3. Tasks do not complete*

2020-10-07 21:00:37.458 ERROR 1 --- [9580107498927).]
o.a.f.runtime.taskexecutor.TaskExecutor  : Task did not exit gracefully
within 180 + seconds.

org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
within 180 + seconds.
at
org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

202

NoResourceAvailableException

2020-10-07 Thread Alexander Semeshchenko


Installing (download & tar zxf) Apache Flink 1.11.1 and running: ./bin/flink
run examples/streaming/WordCount.jar it show on the nice message after more
less 5 min. the trying of submitting:  Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure that the cluster has enough resources. at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
... 45 more Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.TimeoutException at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

It's Flink default configuration.

Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little
Endian CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 1 Core(s) per
socket: 1
free -g total used free shared buff/cache available

Mem: 62 1 23 3 37 57 Swap: 7 0 7

are there some advices about what is happened?


Re: S3 StreamingFileSink issues

2020-10-07 Thread Dan Diephouse
FYI - I discovered that if I specify the Hadoop compression codec it works
fine. E.g.:

CompressWriters.forExtractor(new
DefaultExtractor()).withHadoopCompression("GzipCodec")

Haven't dug into exactly why yet.

On Wed, Oct 7, 2020 at 12:14 PM David Anderson 
wrote:

> Looping in @Kostas Kloudas  who should be able to
> clarify things.
>
> David
>
> On Wed, Oct 7, 2020 at 7:12 PM Dan Diephouse  wrote:
>
>> Thanks! Completely missed that in the docs. It's now working, however
>> it's not working with compression writers. Someone else noted this issue
>> here:
>>
>>
>> https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming
>>
>> Looking at the code, I'm not sure I follow the nuances of why sync()
>> doesn't just do a call to flush in RefCountedBufferingFileStream:
>>
>> public void sync() throws IOException {
>> throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
>> cannot sync state to S3. " +
>> "Use persist() to create a persistent recoverable intermediate point.");
>> }
>>
>> If there are any pointers here on what should happen, happy to submit a
>> patch.
>>
>>
>>
>>
>> On Wed, Oct 7, 2020 at 1:37 AM David Anderson 
>> wrote:
>>
>>> Dan,
>>>
>>> The first point you've raised is a known issue: When a job is stopped,
>>> the unfinished part files are not transitioned to the finished state. This
>>> is mentioned in the docs as Important Note 2 [1], and fixing this is
>>> waiting on FLIP-46 [2]. That section of the docs also includes some
>>> S3-specific warnings, but nothing pertaining to managing credentials.
>>> Perhaps [3] will help.
>>>
>>> Regards,
>>> David
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>>>
>>>
>>> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse  wrote:
>>>
 First, let me say, Flink is super cool - thanks everyone for making my
 life easier in a lot of ways! Wish I had this 10 years ago

 Onto the fun stuff: I am attempting to use the StreamingFileSink with
 S3. Note that Flink is embedded in my app, not running as a standalone
 cluster.

 I am having a few problems, which I have illustrated in the small test
 case below.

 1) After my job finishes, data never gets committed to S3. Looking
 through the code, I've noticed that data gets flushed to disk, but the
 multi-part upload is never finished. Even though my data doesn't hit the
 min part size, I would expect that if my job ends, my data should get
 uploaded since the job is 100% done.

 I am also having problems when the job is running not uploading - but I
 haven't been able to distill that down to a simple test case, so I thought
 I'd start here.

 2) The S3 Filesystem does not pull credentials from the Flink
 Configuration when running in embedded mode. I have a workaround for this,
 but it is ugly. If you comment out the line in the test case which talks
 about this workaround, you will end up with a "Java.net.SocketException:
 Host is down"

 Can anyone shed light on these two issues? Thanks!

 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
 import
 org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import
 org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.junit.jupiter.api.Test;

 public class S3Test {
 @Test
 public void whyDoesntThisWork() throws Exception {
 Configuration configuration = new Configuration();
 configuration.setString("state.backend",
 MemoryStateBackendFactory.class.getName());
 configuration.setString("s3.access.key", "");
 configuration.setString("s3.secret.key", "");

 // If I don't do this, the S3 filesystem never gets the
 credentials
 FileSystem.initialize(configuration, null);

 LocalStreamEnvironment env =
 StreamExecutionEnvironment.createLocalEnvironment(1, configuration);

 StreamingFileSink s3 = StreamingFileSink
 .forRowFormat(new Path("s3://bucket/"), new
 SimpleStringEncoder())
 .build();

 env.fromElements("string1", "string2")
 .addSink(s3);

  

Re: S3 StreamingFileSink issues

2020-10-07 Thread David Anderson
Looping in @Kostas Kloudas  who should be able to
clarify things.

David

On Wed, Oct 7, 2020 at 7:12 PM Dan Diephouse  wrote:

> Thanks! Completely missed that in the docs. It's now working, however it's
> not working with compression writers. Someone else noted this issue here:
>
>
> https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming
>
> Looking at the code, I'm not sure I follow the nuances of why sync()
> doesn't just do a call to flush in RefCountedBufferingFileStream:
>
> public void sync() throws IOException {
> throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
> cannot sync state to S3. " +
> "Use persist() to create a persistent recoverable intermediate point.");
> }
>
> If there are any pointers here on what should happen, happy to submit a
> patch.
>
>
>
>
> On Wed, Oct 7, 2020 at 1:37 AM David Anderson 
> wrote:
>
>> Dan,
>>
>> The first point you've raised is a known issue: When a job is stopped,
>> the unfinished part files are not transitioned to the finished state. This
>> is mentioned in the docs as Important Note 2 [1], and fixing this is
>> waiting on FLIP-46 [2]. That section of the docs also includes some
>> S3-specific warnings, but nothing pertaining to managing credentials.
>> Perhaps [3] will help.
>>
>> Regards,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>>
>>
>> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse  wrote:
>>
>>> First, let me say, Flink is super cool - thanks everyone for making my
>>> life easier in a lot of ways! Wish I had this 10 years ago
>>>
>>> Onto the fun stuff: I am attempting to use the StreamingFileSink with
>>> S3. Note that Flink is embedded in my app, not running as a standalone
>>> cluster.
>>>
>>> I am having a few problems, which I have illustrated in the small test
>>> case below.
>>>
>>> 1) After my job finishes, data never gets committed to S3. Looking
>>> through the code, I've noticed that data gets flushed to disk, but the
>>> multi-part upload is never finished. Even though my data doesn't hit the
>>> min part size, I would expect that if my job ends, my data should get
>>> uploaded since the job is 100% done.
>>>
>>> I am also having problems when the job is running not uploading - but I
>>> haven't been able to distill that down to a simple test case, so I thought
>>> I'd start here.
>>>
>>> 2) The S3 Filesystem does not pull credentials from the Flink
>>> Configuration when running in embedded mode. I have a workaround for this,
>>> but it is ugly. If you comment out the line in the test case which talks
>>> about this workaround, you will end up with a "Java.net.SocketException:
>>> Host is down"
>>>
>>> Can anyone shed light on these two issues? Thanks!
>>>
>>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.core.fs.FileSystem;
>>> import org.apache.flink.core.fs.Path;
>>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
>>> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>>> import org.junit.jupiter.api.Test;
>>>
>>> public class S3Test {
>>> @Test
>>> public void whyDoesntThisWork() throws Exception {
>>> Configuration configuration = new Configuration();
>>> configuration.setString("state.backend",
>>> MemoryStateBackendFactory.class.getName());
>>> configuration.setString("s3.access.key", "");
>>> configuration.setString("s3.secret.key", "");
>>>
>>> // If I don't do this, the S3 filesystem never gets the
>>> credentials
>>> FileSystem.initialize(configuration, null);
>>>
>>> LocalStreamEnvironment env =
>>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>>>
>>> StreamingFileSink s3 = StreamingFileSink
>>> .forRowFormat(new Path("s3://bucket/"), new
>>> SimpleStringEncoder())
>>> .build();
>>>
>>> env.fromElements("string1", "string2")
>>> .addSink(s3);
>>>
>>> env.execute();
>>>
>>> System.out.println("Done");
>>> }
>>> }
>>>
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>
> --
> Dan Diephouse
> @dandiep
>


Re: Is it possible that late events are processed before the window?

2020-10-07 Thread Ori Popowski
Thanks

On Wed, Oct 7, 2020 at 7:06 PM Till Rohrmann  wrote:

> Hi Ori,
>
> you are right. Events are being sent down the side output for late events
> if the event's timestamp + the allowed lateness is smaller than the current
> watermark. These events are directly seen by downstream operators which
> consume the side output for late events.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski  wrote:
>
>> After creating a toy example I think that I've got the concept of
>> lateDataOutput wrong.
>>
>> It seems that the lateDataSideOutput has nothing to do with windowing;
>> when events arrive late they'll just go straight to the side output, and
>> there can never be any window firing of the main flow for that specific key.
>>
>> On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski  wrote:
>>
>>> I've made an experiment where I use an evictor on the main window (not
>>> the late one), only to write a debug file when the window fires (I don't
>>> actually evict events, I've made it so I can write a debug object the
>>> moment the window finishes).
>>>
>>> I can see that indeed the late data window fires before the main window,
>>> since the mentioned debug file does not exist, but late events _do_ exist
>>> in the destination.
>>>
>>> Writing this debug object in the evictor eliminates potential problems
>>> that might be due to logic in the process function, and it proves that the
>>> window of the late events indeed fires before the main window.
>>>
>>> Here's an outline of my job:
>>>
>>> val windowedStream = senv
>>>   .addSource(kafkaSource)
>>>   ... // some operators
>>>   // like BoundedOutOfOrdereness but ignore future timestamps
>>>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>>>   ... // some more operators
>>>   .keyingBy { case (meta, _) => meta.toPath }
>>>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
>>> window
>>>   .sideOutputLateData(lateDataTag)
>>>   .process(new ProcessSession(sessionPlayback, config))
>>> windowedStream
>>>   .map(new SerializeSession(sessionPlayback))
>>>   .addSink(sink)
>>> windowedStream
>>>   .getSideOutput(lateDataTag)
>>>   .keyingBy { case (meta, _) => meta.toPath }
>>>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
>>> window
>>>   .process(new ProcessSession(sessionPlayback, config, true))
>>>   .map(new SerializeSession(sessionPlayback, late = true))
>>>
>>> So, to repeat the question, is that normal? And if not - how can I fix
>>> this?
>>>
>>> Thanks
>>>
>>> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski  wrote:
>>>

 I have a job with event-time session window of 30 minutes.

 I output late events to side output, where I have a tumbling processing
 time window of 30 minutes.

 I observe that the late events are written to storage before the "main"
 events.

 I wanted to know if it's normal before digging into the code and
 debugging the problem.

 Thanks

>>>


Re: The file STDOUT does not exist on the TaskExecutor

2020-10-07 Thread sidhant gupta
Hi Till,

I understand the errors which appears in my logs are not stopping me from
running the job. I am running flink session cluster in ECS and also
configured graylog to get the container logs. So getting the docker logs is
also not an issue.
But is there a way to suppress this error or any work around ?

Thanks
Sidhant Gupta

On Wed, Oct 7, 2020, 9:15 PM Till Rohrmann  wrote:

> Hi Sidhant,
>
> when using Flink's Docker image, then the cluster won't create the out
> files. Instead the components will directly write to STDOUT which is
> captured by Kubernetes and can be viewed using `kubectl logs POD_NAME`. The
> error which appears in your logs is not a problem. It is simply the REST
> handler which tries to serve the out files.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 5:11 PM 大森林  wrote:
>
>> what's your running mode?
>> if your flink cluster is on yarn mode,then the output you need has no
>> relation to $FLINK_HOME/logs/*.out
>>
>>
>> -- 原始邮件 --
>> *发件人:* "sidhant gupta" ;
>> *发送时间:* 2020年10月7日(星期三) 晚上11:33
>> *收件人:* "大森林";"user";
>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>
>> Hi,
>>
>> I'm running flink cluster in ecs. There is a pipeline which creates the
>> job manager and then the task manager using the docker image.
>>
>> Not sure if we would want to restart the cluster in production.
>>
>> Is there any way we can make sure the .out files will be created without
>> restart ?
>>
>> I am able to see the logs in the logs tab but not the stdout logs in the
>> web ui and getting the below mentioned error after running the job.
>>
>> Thanks
>> Sidhant Gupta
>>
>>
>> On Wed, Oct 7, 2020, 8:00 PM 大森林  wrote:
>>
>>> it's easy,
>>> just restart your flink cluster(standalone mode)
>>>
>>> if you run flink in yarn mode,then the result will display on
>>> $HADOOP/logs/*.out files
>>>
>>> -- 原始邮件 --
>>> *发件人:* "sidhant gupta" ;
>>> *发送时间:* 2020年10月7日(星期三) 晚上9:52
>>> *收件人:* "大森林";
>>> *抄送:* "user";
>>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>>
>>> ++ user
>>>
>>> On Wed, Oct 7, 2020, 6:47 PM sidhant gupta  wrote:
>>>
 Hi

 I checked in the $FLINK_HOME/logs. The .out file was not there. Can
 you suggest what should be the action item ?

 Thanks
 Sidhant Gupta


 On Wed, Oct 7, 2020, 7:17 AM 大森林  wrote:

>
> check if the .out file is in $FLINK_HOME/logs  please.
>
> -- 原始邮件 --
> *发件人:* "sidhant gupta" ;
> *发送时间:* 2020年10月7日(星期三) 凌晨1:52
> *收件人:* "大森林";
> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>
> Hi,
>
> I am just running the docker container as it is by adding just the
> conf/flink.yaml .
> I am not sure if the .out file got deleted. Do we need to expose some
> ports ?
>
> Thanks
> Sidhant Gupta
>
>
>
> On Tue, Oct 6, 2020, 8:51 PM 大森林  wrote:
>
>>
>> Hi,I guess you may deleted .out file in $FLINK_HOME/logs.
>> you can just use your default log settings.
>> -- 原始邮件 --
>> *发件人:* "sidhant gupta" ;
>> *发送时间:* 2020年10月6日(星期二) 晚上10:59
>> *收件人:* "user";
>> *主题:* The file STDOUT does not exist on the TaskExecutor
>>
>> Hi,
>>
>> I am running dockerized flink:1.11.0-scala_2.11 container in ecs. I
>> am getting the following error after the job runs:
>>
>> ERROR org.apache.flink.runtime.rest.handler.taskmanager.
>> TaskManagerStdoutFileHandler [] - Unhandled exception.
>> org.apache.flink.util.FlinkException: The file STDOUT does not exist
>> on the TaskExecutor.
>> at org.apache.flink.runtime.taskexecutor.TaskExecutor
>> .lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
>> CompletableFuture.java:1604) ~[?:1.8.0_262]
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1149) ~[?:1.8.0_262]
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:624) ~[?:1.8.0_262]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_262]
>>
>> I guess "file" needs to be added in log4j.properties in the docker
>> container e.g. log4j.rootLogger=INFO, file
>> Are there any other properties which needs to be configured in any of
>> the other property files or any jar needs to be added in the */opt/flink
>> *path ?
>> Thanks
>> Sidhant Gupta
>>
>>


Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Dylan Forciea
Actually…. It looks like what I did covers both cases. I’ll see about getting 
some unit tests and documentation updated.

Dylan

From: Dylan Forciea 
Date: Wednesday, October 7, 2020 at 11:47 AM
To: Till Rohrmann , dev 
Cc: Shengkai Fang , "user@flink.apache.org" 
, "j...@apache.org" , Leonard Xu 

Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Ok, I have created FLINK-19522 describing the issue. I have the code I made so 
far checked in at 
https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this 
only fixes the SQL API. It sounds like there may be another change needed for 
the Table API… I’ll look into that and see if I can figure it out on my own 
while they’re out. I will also need to add some unit tests and update some 
documentation to get this ready for a PR.

Thanks,
Dylan

From: Till Rohrmann 
Date: Wednesday, October 7, 2020 at 10:55 AM
To: dev 
Cc: Shengkai Fang , "user@flink.apache.org" 
, "j...@apache.org" , Leonard Xu 

Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

thanks for reaching out to the Flink community and excuse our late response. I 
am not an expert for the Table API and its JDBC connector but what you describe 
sounds like a missing feature. Also given that FLINK-12198 enabled this feature 
for the JDBCInputFormat indicates that we might simply need to make it 
configurable from the JdbcTableSource. I am pulling in Jark and Leonard who 
worked on the JdbcTableSource and might help you to get this feature into 
Flink. Their response could take a week because they are currently on vacation 
if I am not mistaken.

What you could already do is to open an issue linking FLINK-12198 and 
describing the problem and your solution proposal.

[1] https://issues.apache.org/jira/browse/FLINK-12198

Cheers,
Till

On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
I appreciate it! Let me know if you want me to submit a PR against the issue 
after it is created. It wasn’t a huge amount of code, so it’s probably not a 
big deal if you wanted to redo it.

Thanks,
Dylan

From: Shengkai Fang mailto:fskm...@gmail.com>>
Date: Wednesday, October 7, 2020 at 9:06 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Sorry for late response. +1 to support it. I will open a jira about it later.

Dylan Forciea 
mailto:dy...@oseberg.io>>>于2020年10月7日
 周三下午9:53写道:













I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.



If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.



Regards,

Dylan Forciea

Oseberg




From: Dylan Forciea 
mailto:dy...@oseberg.io>>>


Date: Thursday, October 1, 2020 at 5:14 PM


To: 
"user@flink.apache.org>"
 
mailto:user@flink.apache.org>>>


Subject: autoCommit for postgres jdbc streaming in Table/SQL API






Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 
across an issue with streaming postgres data via the Table/SQL API.



I see that the API has the scan.fetch-size option, but not scan.auto-commit per



https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
 . I had attempted to load a large table in, but it completely slurped it into 
memory before starting the streaming. I modified the flink source code to add a 
scan.auto-commit

option, and I was then able to immediately start streaming and cut my memory 
usage way down.



I see in this thread that there was a similar issue resolved for 
JDBCInputFormat in this thread:



http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
 , but I don’t see a way to utilize that in the Table/SQL API.



Am I missing something on how to pull this off?



Regards,

Dylan Forciea

Oseberg







Re: S3 StreamingFileSink issues

2020-10-07 Thread Dan Diephouse
Thanks! Completely missed that in the docs. It's now working, however it's
not working with compression writers. Someone else noted this issue here:

https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming

Looking at the code, I'm not sure I follow the nuances of why sync()
doesn't just do a call to flush in RefCountedBufferingFileStream:

public void sync() throws IOException {
throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
cannot sync state to S3. " +
"Use persist() to create a persistent recoverable intermediate point.");
}

If there are any pointers here on what should happen, happy to submit a
patch.




On Wed, Oct 7, 2020 at 1:37 AM David Anderson  wrote:

> Dan,
>
> The first point you've raised is a known issue: When a job is stopped, the
> unfinished part files are not transitioned to the finished state. This is
> mentioned in the docs as Important Note 2 [1], and fixing this is waiting
> on FLIP-46 [2]. That section of the docs also includes some S3-specific
> warnings, but nothing pertaining to managing credentials. Perhaps [3] will
> help.
>
> Regards,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>
>
> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse  wrote:
>
>> First, let me say, Flink is super cool - thanks everyone for making my
>> life easier in a lot of ways! Wish I had this 10 years ago
>>
>> Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
>> Note that Flink is embedded in my app, not running as a standalone cluster.
>>
>> I am having a few problems, which I have illustrated in the small test
>> case below.
>>
>> 1) After my job finishes, data never gets committed to S3. Looking
>> through the code, I've noticed that data gets flushed to disk, but the
>> multi-part upload is never finished. Even though my data doesn't hit the
>> min part size, I would expect that if my job ends, my data should get
>> uploaded since the job is 100% done.
>>
>> I am also having problems when the job is running not uploading - but I
>> haven't been able to distill that down to a simple test case, so I thought
>> I'd start here.
>>
>> 2) The S3 Filesystem does not pull credentials from the Flink
>> Configuration when running in embedded mode. I have a workaround for this,
>> but it is ugly. If you comment out the line in the test case which talks
>> about this workaround, you will end up with a "Java.net.SocketException:
>> Host is down"
>>
>> Can anyone shed light on these two issues? Thanks!
>>
>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.core.fs.FileSystem;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
>> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>> import org.junit.jupiter.api.Test;
>>
>> public class S3Test {
>> @Test
>> public void whyDoesntThisWork() throws Exception {
>> Configuration configuration = new Configuration();
>> configuration.setString("state.backend",
>> MemoryStateBackendFactory.class.getName());
>> configuration.setString("s3.access.key", "");
>> configuration.setString("s3.secret.key", "");
>>
>> // If I don't do this, the S3 filesystem never gets the
>> credentials
>> FileSystem.initialize(configuration, null);
>>
>> LocalStreamEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>>
>> StreamingFileSink s3 = StreamingFileSink
>> .forRowFormat(new Path("s3://bucket/"), new
>> SimpleStringEncoder())
>> .build();
>>
>> env.fromElements("string1", "string2")
>> .addSink(s3);
>>
>> env.execute();
>>
>> System.out.println("Done");
>> }
>> }
>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 
Dan Diephouse
@dandiep


Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Dylan Forciea
Ok, I have created FLINK-19522 describing the issue. I have the code I made so 
far checked in at 
https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this 
only fixes the SQL API. It sounds like there may be another change needed for 
the Table API… I’ll look into that and see if I can figure it out on my own 
while they’re out. I will also need to add some unit tests and update some 
documentation to get this ready for a PR.

Thanks,
Dylan

From: Till Rohrmann 
Date: Wednesday, October 7, 2020 at 10:55 AM
To: dev 
Cc: Shengkai Fang , "user@flink.apache.org" 
, "j...@apache.org" , Leonard Xu 

Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

thanks for reaching out to the Flink community and excuse our late response. I 
am not an expert for the Table API and its JDBC connector but what you describe 
sounds like a missing feature. Also given that FLINK-12198 enabled this feature 
for the JDBCInputFormat indicates that we might simply need to make it 
configurable from the JdbcTableSource. I am pulling in Jark and Leonard who 
worked on the JdbcTableSource and might help you to get this feature into 
Flink. Their response could take a week because they are currently on vacation 
if I am not mistaken.

What you could already do is to open an issue linking FLINK-12198 and 
describing the problem and your solution proposal.

[1] https://issues.apache.org/jira/browse/FLINK-12198

Cheers,
Till

On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
I appreciate it! Let me know if you want me to submit a PR against the issue 
after it is created. It wasn’t a huge amount of code, so it’s probably not a 
big deal if you wanted to redo it.

Thanks,
Dylan

From: Shengkai Fang mailto:fskm...@gmail.com>>
Date: Wednesday, October 7, 2020 at 9:06 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Sorry for late response. +1 to support it. I will open a jira about it later.

Dylan Forciea 
mailto:dy...@oseberg.io>>>于2020年10月7日
 周三下午9:53写道:













I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.



If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.



Regards,

Dylan Forciea

Oseberg




From: Dylan Forciea 
mailto:dy...@oseberg.io>>>


Date: Thursday, October 1, 2020 at 5:14 PM


To: 
"user@flink.apache.org>"
 
mailto:user@flink.apache.org>>>


Subject: autoCommit for postgres jdbc streaming in Table/SQL API






Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 
across an issue with streaming postgres data via the Table/SQL API.



I see that the API has the scan.fetch-size option, but not scan.auto-commit per



https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
 . I had attempted to load a large table in, but it completely slurped it into 
memory before starting the streaming. I modified the flink source code to add a 
scan.auto-commit

option, and I was then able to immediately start streaming and cut my memory 
usage way down.



I see in this thread that there was a similar issue resolved for 
JDBCInputFormat in this thread:



http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
 , but I don’t see a way to utilize that in the Table/SQL API.



Am I missing something on how to pull this off?



Regards,

Dylan Forciea

Oseberg






Re: Applying Custom metrics

2020-10-07 Thread Till Rohrmann
Hi Piper,

the RichMapFunction's map function is called for every record you are
processing in your DataStream/DataSet. The RichMapFunction is the
definition of the map function you are applying to every record. Hence, it
is basically what you pass to the DataStream.map(MapFunction myMapFunction)
in order to describe the transformation you want to do.

If you want to define your own metrics, then you need to define a
RichMapFunction and use its getRuntimeContext() method to call

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

in order to register a custom counter. For more information please take a
look at [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#user-scope

Cheers,
Till

On Tue, Oct 6, 2020 at 1:05 AM Piper Piper  wrote:

> Hi
>
> I have questions regarding making my own custom metrics.
>
> When exactly is the class RichMapFunction’s map(value) method
> called/invoked, and what “value” will be passed/expected as an argument to
> this map(value) method?
>
> Does the RichMapFunction’s map() method have any relation to the
> transformation map() method, or are they completely different?
>
> Once defined, how do I put the custom metric onto a specific source,
> specific operator, and onto a specific sink in my job’s DAG?
>
> Thank you,
>
> Piper
>


Re: windowsState() and globalState()

2020-10-07 Thread Till Rohrmann
Hi Jiazhi,

here is a description of Flink's windowing API and also how to use the
windowState() and globalState() method of the ProcessWindowFunction [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

Cheers,
Till

On Tue, Oct 6, 2020 at 2:42 PM ゞ野蠻遊戲χ  wrote:

> Dear all:
>
>   How do I use the windowsState() method and the globalState() method
> in process Windows Function? Can I give a demo?
>
> Thanks,
> Jiazhi
>


Re: flink configuration: best practice for checkpoint storage secrets

2020-10-07 Thread Till Rohrmann
Hi Qinghui,

the recommended way would be to use AWS identity and access management
(IAM) [1] if possible.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#configure-access-credentials

Cheers,
Till

On Wed, Oct 7, 2020 at 12:31 PM XU Qinghui  wrote:

> Hello, folks
>
> We are trying to use S3 for the checkpoint storage, and this involves some
> secrets in the configuration. We tried two approaches to configure those
> secrets:
> - in the jvm application argument for jobmanager and taskmanager, such as
> -Ds3.secret-key
> - in the flink-conf.yaml file for jobmanager and taskmanager
>
> Is there a third way? What's the best practice?
> Thanks a lot!
>
> Best regards,
> Qinghui
>


Re: Flink Kuberntes Libraries

2020-10-07 Thread Till Rohrmann
HI Saksham,

the easiest approach would probably be to include the required libraries in
your user code jar which you submit to the cluster. Using maven's shade
plugin should help with this task. Alternatively, you could also create a
custom Flink Docker image where you add the required libraries to the
FLINK_HOME/libs directory. This would however mean that every job you
submit to the Flink cluster would see these libraries in the system class
path.

Cheers,
Till

On Wed, Oct 7, 2020 at 2:08 PM saksham sapra 
wrote:

> Hi ,
>
> i have made some configuration using this link page :
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
> .
> and i am able to run flink on UI , but i need to submit a job using :
> http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy/#/submit
> through POstman, and i have some libraries which in local i can add in libs
> folder but in this how can i add my libraries so that it works properly.
>
> [image: image.png]
>


Re: Is it possible that late events are processed before the window?

2020-10-07 Thread Till Rohrmann
Hi Ori,

you are right. Events are being sent down the side output for late events
if the event's timestamp + the allowed lateness is smaller than the current
watermark. These events are directly seen by downstream operators which
consume the side output for late events.

Cheers,
Till

On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski  wrote:

> After creating a toy example I think that I've got the concept of
> lateDataOutput wrong.
>
> It seems that the lateDataSideOutput has nothing to do with windowing;
> when events arrive late they'll just go straight to the side output, and
> there can never be any window firing of the main flow for that specific key.
>
> On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski  wrote:
>
>> I've made an experiment where I use an evictor on the main window (not
>> the late one), only to write a debug file when the window fires (I don't
>> actually evict events, I've made it so I can write a debug object the
>> moment the window finishes).
>>
>> I can see that indeed the late data window fires before the main window,
>> since the mentioned debug file does not exist, but late events _do_ exist
>> in the destination.
>>
>> Writing this debug object in the evictor eliminates potential problems
>> that might be due to logic in the process function, and it proves that the
>> window of the late events indeed fires before the main window.
>>
>> Here's an outline of my job:
>>
>> val windowedStream = senv
>>   .addSource(kafkaSource)
>>   ... // some operators
>>   // like BoundedOutOfOrdereness but ignore future timestamps
>>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>>   ... // some more operators
>>   .keyingBy { case (meta, _) => meta.toPath }
>>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
>> window
>>   .sideOutputLateData(lateDataTag)
>>   .process(new ProcessSession(sessionPlayback, config))
>> windowedStream
>>   .map(new SerializeSession(sessionPlayback))
>>   .addSink(sink)
>> windowedStream
>>   .getSideOutput(lateDataTag)
>>   .keyingBy { case (meta, _) => meta.toPath }
>>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
>> window
>>   .process(new ProcessSession(sessionPlayback, config, true))
>>   .map(new SerializeSession(sessionPlayback, late = true))
>>
>> So, to repeat the question, is that normal? And if not - how can I fix
>> this?
>>
>> Thanks
>>
>> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski  wrote:
>>
>>>
>>> I have a job with event-time session window of 30 minutes.
>>>
>>> I output late events to side output, where I have a tumbling processing
>>> time window of 30 minutes.
>>>
>>> I observe that the late events are written to storage before the "main"
>>> events.
>>>
>>> I wanted to know if it's normal before digging into the code and
>>> debugging the problem.
>>>
>>> Thanks
>>>
>>


Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Till Rohrmann
Hi Dylan,

thanks for reaching out to the Flink community and excuse our late
response. I am not an expert for the Table API and its JDBC connector but
what you describe sounds like a missing feature. Also given that
FLINK-12198 enabled this feature for the JDBCInputFormat indicates that we
might simply need to make it configurable from the JdbcTableSource. I am
pulling in Jark and Leonard who worked on the JdbcTableSource and might
help you to get this feature into Flink. Their response could take a week
because they are currently on vacation if I am not mistaken.

What you could already do is to open an issue linking FLINK-12198 and
describing the problem and your solution proposal.

[1] https://issues.apache.org/jira/browse/FLINK-12198

Cheers,
Till

On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea  wrote:

> I appreciate it! Let me know if you want me to submit a PR against the
> issue after it is created. It wasn’t a huge amount of code, so it’s
> probably not a big deal if you wanted to redo it.
>
> Thanks,
> Dylan
>
> From: Shengkai Fang 
> Date: Wednesday, October 7, 2020 at 9:06 AM
> To: Dylan Forciea 
> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
> Sorry for late response. +1 to support it. I will open a jira about it
> later.
>
> Dylan Forciea mailto:dy...@oseberg.io>>于2020年10月7日
> 周三下午9:53写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
> I hadn’t heard a response on this, so I’m going to expand this to the dev
> email list.
>
>
>
> If this is indeed an issue and not my misunderstanding, I have most of a
> patch already coded up. Please let me know, and I can create a JIRA issue
> and send out a PR.
>
>
>
> Regards,
>
> Dylan Forciea
>
> Oseberg
>
>
>
>
> From: Dylan Forciea mailto:dy...@oseberg.io>>
>
>
> Date: Thursday, October 1, 2020 at 5:14 PM
>
>
> To: "user@flink.apache.org" <
> user@flink.apache.org>
>
>
> Subject: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
>
>
>
> Hi! I’ve just recently started evaluating Flink for our ETL needs, and I
> ran across an issue with streaming postgres data via the Table/SQL API.
>
>
>
> I see that the API has the scan.fetch-size option, but not
> scan.auto-commit per
>
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
> . I had attempted to load a large table in, but it completely slurped it
> into memory before starting the streaming. I modified the flink source code
> to add a scan.auto-commit
>
> option, and I was then able to immediately start streaming and cut my
> memory usage way down.
>
>
>
> I see in this thread that there was a similar issue resolved for
> JDBCInputFormat in this thread:
>
>
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
> , but I don’t see a way to utilize that in the Table/SQL API.
>
>
>
> Am I missing something on how to pull this off?
>
>
>
> Regards,
>
> Dylan Forciea
>
> Oseberg
>
>
>
>
>
>


Re: The file STDOUT does not exist on the TaskExecutor

2020-10-07 Thread Till Rohrmann
Hi Sidhant,

when using Flink's Docker image, then the cluster won't create the out
files. Instead the components will directly write to STDOUT which is
captured by Kubernetes and can be viewed using `kubectl logs POD_NAME`. The
error which appears in your logs is not a problem. It is simply the REST
handler which tries to serve the out files.

Cheers,
Till

On Wed, Oct 7, 2020 at 5:11 PM 大森林  wrote:

> what's your running mode?
> if your flink cluster is on yarn mode,then the output you need has no
> relation to $FLINK_HOME/logs/*.out
>
>
> -- 原始邮件 --
> *发件人:* "sidhant gupta" ;
> *发送时间:* 2020年10月7日(星期三) 晚上11:33
> *收件人:* "大森林";"user";
> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>
> Hi,
>
> I'm running flink cluster in ecs. There is a pipeline which creates the
> job manager and then the task manager using the docker image.
>
> Not sure if we would want to restart the cluster in production.
>
> Is there any way we can make sure the .out files will be created without
> restart ?
>
> I am able to see the logs in the logs tab but not the stdout logs in the
> web ui and getting the below mentioned error after running the job.
>
> Thanks
> Sidhant Gupta
>
>
> On Wed, Oct 7, 2020, 8:00 PM 大森林  wrote:
>
>> it's easy,
>> just restart your flink cluster(standalone mode)
>>
>> if you run flink in yarn mode,then the result will display on
>> $HADOOP/logs/*.out files
>>
>> -- 原始邮件 --
>> *发件人:* "sidhant gupta" ;
>> *发送时间:* 2020年10月7日(星期三) 晚上9:52
>> *收件人:* "大森林";
>> *抄送:* "user";
>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>
>> ++ user
>>
>> On Wed, Oct 7, 2020, 6:47 PM sidhant gupta  wrote:
>>
>>> Hi
>>>
>>> I checked in the $FLINK_HOME/logs. The .out file was not there. Can you
>>> suggest what should be the action item ?
>>>
>>> Thanks
>>> Sidhant Gupta
>>>
>>>
>>> On Wed, Oct 7, 2020, 7:17 AM 大森林  wrote:
>>>

 check if the .out file is in $FLINK_HOME/logs  please.

 -- 原始邮件 --
 *发件人:* "sidhant gupta" ;
 *发送时间:* 2020年10月7日(星期三) 凌晨1:52
 *收件人:* "大森林";
 *主题:* Re: The file STDOUT does not exist on the TaskExecutor

 Hi,

 I am just running the docker container as it is by adding just the
 conf/flink.yaml .
 I am not sure if the .out file got deleted. Do we need to expose some
 ports ?

 Thanks
 Sidhant Gupta



 On Tue, Oct 6, 2020, 8:51 PM 大森林  wrote:

>
> Hi,I guess you may deleted .out file in $FLINK_HOME/logs.
> you can just use your default log settings.
> -- 原始邮件 --
> *发件人:* "sidhant gupta" ;
> *发送时间:* 2020年10月6日(星期二) 晚上10:59
> *收件人:* "user";
> *主题:* The file STDOUT does not exist on the TaskExecutor
>
> Hi,
>
> I am running dockerized flink:1.11.0-scala_2.11 container in ecs. I
> am getting the following error after the job runs:
>
> ERROR org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerStdoutFileHandler [] - Unhandled exception.
> org.apache.flink.util.FlinkException: The file STDOUT does not exist
> on the TaskExecutor.
> at org.apache.flink.runtime.taskexecutor.TaskExecutor
> .lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
> CompletableFuture.java:1604) ~[?:1.8.0_262]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149) ~[?:1.8.0_262]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624) ~[?:1.8.0_262]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_262]
>
> I guess "file" needs to be added in log4j.properties in the docker
> container e.g. log4j.rootLogger=INFO, file
> Are there any other properties which needs to be configured in any of
> the other property files or any jar needs to be added in the */opt/flink
> *path ?
> Thanks
> Sidhant Gupta
>
>


Re: Statefun + Confluent Fully-managed Kafka

2020-10-07 Thread Till Rohrmann
Hi Hezekiah, thanks for reporting this issue. I am pulling Gordon and Igal
in who might be able to help you with this problem.

Cheers,
Till

On Wed, Oct 7, 2020 at 3:56 PM hezekiah maina 
wrote:

> Hi,
>
> I'm trying to use Stateful Functions with Kafka as my ingress and egress.
> I'm using the Confluent fully-managed Kafka and I'm having a challenge
> adding my authentication details in the module.yaml file.
> Here is my current config details:
> version: "1.0"
> module:
>   meta:
> type: remote
>   spec:
> functions:
>   - function:
>   meta:
> kind: http
> type: example/greeter
>   spec:
> endpoint: 
> states:
>   - seen_count
> maxNumBatchRequests: 500
> timeout: 2min
> ingresses:
>   - ingress:
>   meta:
> type: statefun.kafka.io/routable-protobuf-ingress
> id: example/names
>   spec:
> address: 
> consumerGroupId: statefun-consumer-group
> topics:
>   - topic: names
> typeUrl: com.googleapis/example.GreetRequest
> targets:
>   - example/greeter
> properties:
>   - bootstrap.servers:
>   - security.protocol: SASL_SSL
>   - sasl.mechanism: PLAIN
>   - sasl.jaas.config:
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="USERNAME" password="PASSWORD";
>   - ssl.endpoint.identification.algorithm: https
> egresses:
>   - egress:
>   meta:
> type: statefun.kafka.io/generic-egress
> id: example/greets
>   spec:
> address: 
> deliverySemantic:
>   type: exactly-once
>   transactionTimeoutMillis: 10
> properties:
>   - bootstrap.servers: 
>   - security.protocol: SASL_SSL
>   - sasl.mechanisms: PLAIN
>   - sasl.jaas.config:
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="USERNAME" password="PASSWORD";
>   - ssl.endpoint.identification.algorithm: https
>
> After running docker-compose with a master and worker containers I'm
> getting this error:
> Could not find a 'KafkaClient' entry in the JAAS configuration. System
> property 'java.security.auth.login.config' is
> /tmp/jaas-2846080966990890307.conf
>
> The producer config logged :
> worker_1  | 2020-10-07 13:38:08,489 INFO
>  org.apache.kafka.clients.producer.ProducerConfig  -
> ProducerConfig values:
> worker_1  | acks = 1
> worker_1  | batch.size = 16384
> worker_1  | bootstrap.servers = [https://
> ---.asia-southeast1.gcp.confluent.cloud:9092]
> worker_1  | buffer.memory = 33554432
> worker_1  | client.dns.lookup = default
> worker_1  | client.id =
> worker_1  | compression.type = none
> worker_1  | connections.max.idle.ms = 54
> worker_1  | delivery.timeout.ms = 12
> worker_1  | enable.idempotence = false
> worker_1  | interceptor.classes = []
> worker_1  | key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> worker_1  | linger.ms = 0
> worker_1  | max.block.ms = 6
> worker_1  | max.in.flight.requests.per.connection = 5
> worker_1  | max.request.size = 1048576
> worker_1  | metadata.max.age.ms = 30
> worker_1  | metric.reporters = []
> worker_1  | metrics.num.samples = 2
> worker_1  | metrics.recording.level = INFO
> worker_1  | metrics.sample.window.ms = 3
> worker_1  | partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> worker_1  | receive.buffer.bytes = 32768
> worker_1  | reconnect.backoff.max.ms = 1000
> worker_1  | reconnect.backoff.ms = 50
> worker_1  | request.timeout.ms = 3
> worker_1  | retries = 2147483647
> worker_1  | retry.backoff.ms = 100
> worker_1  | sasl.client.callback.handler.class = null
> worker_1  | sasl.jaas.config = null
> worker_1  | sasl.kerberos.kinit.cmd = /usr/bin/kinit
> worker_1  | sasl.kerberos.min.time.before.relogin = 6
> worker_1  | sasl.kerberos.service.name = null
> worker_1  | sasl.kerberos.ticket.renew.jitter = 0.05
> worker_1  | sasl.kerberos.ticket.renew.window.factor = 0.8
> worker_1  | sasl.login.callback.handler.class = null
> worker_1  | sasl.login.class = null
> worker_1  | sasl.login.refresh.buffer.seconds = 300
> worker_1  | sasl.login.refresh.min.period.seconds = 60
> worker_1  | sasl.login.refresh.window.factor = 0.8
> worker_1  | sasl.login.refresh.window.jitter = 0.05
> worker_1  | sasl.mechanism = GSSAPI
> worker_1  | security.protocol = SASL_SSL
> worker_1  | send.buffer.bytes = 131072
> worker_1  | ssl.cipher.suites = null
> worker_1  | ssl.enabled.protocols =

?????? The file STDOUT does not exist on the TaskExecutor

2020-10-07 Thread ??????
what's your running mode?
if your flink cluster is on yarn mode,then the output you need has no relation 
to $FLINK_HOME/logs/*.out




--  --
??: 
   "sidhant gupta"  
  


Re: The file STDOUT does not exist on the TaskExecutor

2020-10-07 Thread sidhant gupta
Hi,

I'm running flink cluster in ecs. There is a pipeline which creates the job
manager and then the task manager using the docker image.

Not sure if we would want to restart the cluster in production.

Is there any way we can make sure the .out files will be created without
restart ?

I am able to see the logs in the logs tab but not the stdout logs in the
web ui and getting the below mentioned error after running the job.

Thanks
Sidhant Gupta


On Wed, Oct 7, 2020, 8:00 PM 大森林  wrote:

> it's easy,
> just restart your flink cluster(standalone mode)
>
> if you run flink in yarn mode,then the result will display on
> $HADOOP/logs/*.out files
>
> -- 原始邮件 --
> *发件人:* "sidhant gupta" ;
> *发送时间:* 2020年10月7日(星期三) 晚上9:52
> *收件人:* "大森林";
> *抄送:* "user";
> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>
> ++ user
>
> On Wed, Oct 7, 2020, 6:47 PM sidhant gupta  wrote:
>
>> Hi
>>
>> I checked in the $FLINK_HOME/logs. The .out file was not there. Can you
>> suggest what should be the action item ?
>>
>> Thanks
>> Sidhant Gupta
>>
>>
>> On Wed, Oct 7, 2020, 7:17 AM 大森林  wrote:
>>
>>>
>>> check if the .out file is in $FLINK_HOME/logs  please.
>>>
>>> -- 原始邮件 --
>>> *发件人:* "sidhant gupta" ;
>>> *发送时间:* 2020年10月7日(星期三) 凌晨1:52
>>> *收件人:* "大森林";
>>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>>
>>> Hi,
>>>
>>> I am just running the docker container as it is by adding just the
>>> conf/flink.yaml .
>>> I am not sure if the .out file got deleted. Do we need to expose some
>>> ports ?
>>>
>>> Thanks
>>> Sidhant Gupta
>>>
>>>
>>>
>>> On Tue, Oct 6, 2020, 8:51 PM 大森林  wrote:
>>>

 Hi,I guess you may deleted .out file in $FLINK_HOME/logs.
 you can just use your default log settings.
 -- 原始邮件 --
 *发件人:* "sidhant gupta" ;
 *发送时间:* 2020年10月6日(星期二) 晚上10:59
 *收件人:* "user";
 *主题:* The file STDOUT does not exist on the TaskExecutor

 Hi,

 I am running dockerized flink:1.11.0-scala_2.11 container in ecs. I am
 getting the following error after the job runs:

 ERROR org.apache.flink.runtime.rest.handler.taskmanager.
 TaskManagerStdoutFileHandler [] - Unhandled exception.
 org.apache.flink.util.FlinkException: The file STDOUT does not exist on
 the TaskExecutor.
 at org.apache.flink.runtime.taskexecutor.TaskExecutor
 .lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(
 CompletableFuture.java:1604) ~[?:1.8.0_262]
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1149) ~[?:1.8.0_262]
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:624) ~[?:1.8.0_262]
 at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_262]

 I guess "file" needs to be added in log4j.properties in the docker
 container e.g. log4j.rootLogger=INFO, file
 Are there any other properties which needs to be configured in any of
 the other property files or any jar needs to be added in the */opt/flink
 *path ?
 Thanks
 Sidhant Gupta




Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Dylan Forciea
I appreciate it! Let me know if you want me to submit a PR against the issue 
after it is created. It wasn’t a huge amount of code, so it’s probably not a 
big deal if you wanted to redo it.

Thanks,
Dylan

From: Shengkai Fang 
Date: Wednesday, October 7, 2020 at 9:06 AM
To: Dylan Forciea 
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Sorry for late response. +1 to support it. I will open a jira about it later.

Dylan Forciea mailto:dy...@oseberg.io>>于2020年10月7日 周三下午9:53写道:













I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.



If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.



Regards,

Dylan Forciea

Oseberg




From: Dylan Forciea mailto:dy...@oseberg.io>>


Date: Thursday, October 1, 2020 at 5:14 PM


To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>


Subject: autoCommit for postgres jdbc streaming in Table/SQL API






Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 
across an issue with streaming postgres data via the Table/SQL API.



I see that the API has the scan.fetch-size option, but not scan.auto-commit per



https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
 . I had attempted to load a large table in, but it completely slurped it into 
memory before starting the streaming. I modified the flink source code to add a 
scan.auto-commit

option, and I was then able to immediately start streaming and cut my memory 
usage way down.



I see in this thread that there was a similar issue resolved for 
JDBCInputFormat in this thread:



http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
 , but I don’t see a way to utilize that in the Table/SQL API.



Am I missing something on how to pull this off?



Regards,

Dylan Forciea

Oseberg







?????? The file STDOUT does not exist on the TaskExecutor

2020-10-07 Thread ??????
it's easy,
just restart your flink cluster(standalone mode)


if you run flink in yarn mode,then the result will display on 
$HADOOP/logs/*.out files


--  --
??: 
   "sidhant gupta"  
  


Statefun + Confluent Fully-managed Kafka

2020-10-07 Thread hezekiah maina
Hi,

I'm trying to use Stateful Functions with Kafka as my ingress and egress.
I'm using the Confluent fully-managed Kafka and I'm having a challenge
adding my authentication details in the module.yaml file.
Here is my current config details:
version: "1.0"
module:
  meta:
type: remote
  spec:
functions:
  - function:
  meta:
kind: http
type: example/greeter
  spec:
endpoint: 
states:
  - seen_count
maxNumBatchRequests: 500
timeout: 2min
ingresses:
  - ingress:
  meta:
type: statefun.kafka.io/routable-protobuf-ingress
id: example/names
  spec:
address: 
consumerGroupId: statefun-consumer-group
topics:
  - topic: names
typeUrl: com.googleapis/example.GreetRequest
targets:
  - example/greeter
properties:
  - bootstrap.servers:
  - security.protocol: SASL_SSL
  - sasl.mechanism: PLAIN
  - sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
  - ssl.endpoint.identification.algorithm: https
egresses:
  - egress:
  meta:
type: statefun.kafka.io/generic-egress
id: example/greets
  spec:
address: 
deliverySemantic:
  type: exactly-once
  transactionTimeoutMillis: 10
properties:
  - bootstrap.servers: 
  - security.protocol: SASL_SSL
  - sasl.mechanisms: PLAIN
  - sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
  - ssl.endpoint.identification.algorithm: https

After running docker-compose with a master and worker containers I'm
getting this error:
Could not find a 'KafkaClient' entry in the JAAS configuration. System
property 'java.security.auth.login.config' is
/tmp/jaas-2846080966990890307.conf

The producer config logged :
worker_1  | 2020-10-07 13:38:08,489 INFO
 org.apache.kafka.clients.producer.ProducerConfig  -
ProducerConfig values:
worker_1  | acks = 1
worker_1  | batch.size = 16384
worker_1  | bootstrap.servers = [https://
---.asia-southeast1.gcp.confluent.cloud:9092]
worker_1  | buffer.memory = 33554432
worker_1  | client.dns.lookup = default
worker_1  | client.id =
worker_1  | compression.type = none
worker_1  | connections.max.idle.ms = 54
worker_1  | delivery.timeout.ms = 12
worker_1  | enable.idempotence = false
worker_1  | interceptor.classes = []
worker_1  | key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
worker_1  | linger.ms = 0
worker_1  | max.block.ms = 6
worker_1  | max.in.flight.requests.per.connection = 5
worker_1  | max.request.size = 1048576
worker_1  | metadata.max.age.ms = 30
worker_1  | metric.reporters = []
worker_1  | metrics.num.samples = 2
worker_1  | metrics.recording.level = INFO
worker_1  | metrics.sample.window.ms = 3
worker_1  | partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
worker_1  | receive.buffer.bytes = 32768
worker_1  | reconnect.backoff.max.ms = 1000
worker_1  | reconnect.backoff.ms = 50
worker_1  | request.timeout.ms = 3
worker_1  | retries = 2147483647
worker_1  | retry.backoff.ms = 100
worker_1  | sasl.client.callback.handler.class = null
worker_1  | sasl.jaas.config = null
worker_1  | sasl.kerberos.kinit.cmd = /usr/bin/kinit
worker_1  | sasl.kerberos.min.time.before.relogin = 6
worker_1  | sasl.kerberos.service.name = null
worker_1  | sasl.kerberos.ticket.renew.jitter = 0.05
worker_1  | sasl.kerberos.ticket.renew.window.factor = 0.8
worker_1  | sasl.login.callback.handler.class = null
worker_1  | sasl.login.class = null
worker_1  | sasl.login.refresh.buffer.seconds = 300
worker_1  | sasl.login.refresh.min.period.seconds = 60
worker_1  | sasl.login.refresh.window.factor = 0.8
worker_1  | sasl.login.refresh.window.jitter = 0.05
worker_1  | sasl.mechanism = GSSAPI
worker_1  | security.protocol = SASL_SSL
worker_1  | send.buffer.bytes = 131072
worker_1  | ssl.cipher.suites = null
worker_1  | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
worker_1  | ssl.endpoint.identification.algorithm = https
worker_1  | ssl.key.password = null
worker_1  | ssl.keymanager.algorithm = SunX509
worker_1  | ssl.keystore.location = null
worker_1  | ssl.keystore.password = null
worker_1  | ssl.keystore.type = JKS
worker_1  | ssl.protocol = TLS
worker_1  | ssl.provider = null
worker_1  | ssl.secure.random.implementation = null

Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-07 Thread Dylan Forciea
I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.

If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.

Regards,
Dylan Forciea
Oseberg

From: Dylan Forciea 
Date: Thursday, October 1, 2020 at 5:14 PM
To: "user@flink.apache.org" 
Subject: autoCommit for postgres jdbc streaming in Table/SQL API

Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 
across an issue with streaming postgres data via the Table/SQL API.

I see that the API has the scan.fetch-size option, but not scan.auto-commit per 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
 . I had attempted to load a large table in, but it completely slurped it into 
memory before starting the streaming. I modified the flink source code to add a 
scan.auto-commit option, and I was then able to immediately start streaming and 
cut my memory usage way down.

I see in this thread that there was a similar issue resolved for 
JDBCInputFormat in this thread: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
 , but I don’t see a way to utilize that in the Table/SQL API.

Am I missing something on how to pull this off?

Regards,
Dylan Forciea
Oseberg


Re: The file STDOUT does not exist on the TaskExecutor

2020-10-07 Thread sidhant gupta
++ user

On Wed, Oct 7, 2020, 6:47 PM sidhant gupta  wrote:

> Hi
>
> I checked in the $FLINK_HOME/logs. The .out file was not there. Can you
> suggest what should be the action item ?
>
> Thanks
> Sidhant Gupta
>
>
> On Wed, Oct 7, 2020, 7:17 AM 大森林  wrote:
>
>>
>> check if the .out file is in $FLINK_HOME/logs  please.
>>
>> -- 原始邮件 --
>> *发件人:* "sidhant gupta" ;
>> *发送时间:* 2020年10月7日(星期三) 凌晨1:52
>> *收件人:* "大森林";
>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>
>> Hi,
>>
>> I am just running the docker container as it is by adding just the
>> conf/flink.yaml .
>> I am not sure if the .out file got deleted. Do we need to expose some
>> ports ?
>>
>> Thanks
>> Sidhant Gupta
>>
>>
>>
>> On Tue, Oct 6, 2020, 8:51 PM 大森林  wrote:
>>
>>>
>>> Hi,I guess you may deleted .out file in $FLINK_HOME/logs.
>>> you can just use your default log settings.
>>> -- 原始邮件 --
>>> *发件人:* "sidhant gupta" ;
>>> *发送时间:* 2020年10月6日(星期二) 晚上10:59
>>> *收件人:* "user";
>>> *主题:* The file STDOUT does not exist on the TaskExecutor
>>>
>>> Hi,
>>>
>>> I am running dockerized flink:1.11.0-scala_2.11 container in ecs. I am
>>> getting the following error after the job runs:
>>>
>>> ERROR org.apache.flink.runtime.rest.handler.taskmanager.
>>> TaskManagerStdoutFileHandler [] - Unhandled exception.
>>> org.apache.flink.util.FlinkException: The file STDOUT does not exist on
>>> the TaskExecutor.
>>> at org.apache.flink.runtime.taskexecutor.TaskExecutor
>>> .lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
>>> CompletableFuture.java:1604) ~[?:1.8.0_262]
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1149) ~[?:1.8.0_262]
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624) ~[?:1.8.0_262]
>>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_262]
>>>
>>> I guess "file" needs to be added in log4j.properties in the docker
>>> container e.g. log4j.rootLogger=INFO, file
>>> Are there any other properties which needs to be configured in any of
>>> the other property files or any jar needs to be added in the */opt/flink
>>> *path ?
>>> Thanks
>>> Sidhant Gupta
>>>
>>>


Re: Is it possible that late events are processed before the window?

2020-10-07 Thread Ori Popowski
After creating a toy example I think that I've got the concept of
lateDataOutput wrong.

It seems that the lateDataSideOutput has nothing to do with windowing; when
events arrive late they'll just go straight to the side output, and there
can never be any window firing of the main flow for that specific key.

On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski  wrote:

> I've made an experiment where I use an evictor on the main window (not the
> late one), only to write a debug file when the window fires (I don't
> actually evict events, I've made it so I can write a debug object the
> moment the window finishes).
>
> I can see that indeed the late data window fires before the main window,
> since the mentioned debug file does not exist, but late events _do_ exist
> in the destination.
>
> Writing this debug object in the evictor eliminates potential problems
> that might be due to logic in the process function, and it proves that the
> window of the late events indeed fires before the main window.
>
> Here's an outline of my job:
>
> val windowedStream = senv
>   .addSource(kafkaSource)
>   ... // some operators
>   // like BoundedOutOfOrdereness but ignore future timestamps
>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>   ... // some more operators
>   .keyingBy { case (meta, _) => meta.toPath }
>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
> window
>   .sideOutputLateData(lateDataTag)
>   .process(new ProcessSession(sessionPlayback, config))
> windowedStream
>   .map(new SerializeSession(sessionPlayback))
>   .addSink(sink)
> windowedStream
>   .getSideOutput(lateDataTag)
>   .keyingBy { case (meta, _) => meta.toPath }
>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
> window
>   .process(new ProcessSession(sessionPlayback, config, true))
>   .map(new SerializeSession(sessionPlayback, late = true))
>
> So, to repeat the question, is that normal? And if not - how can I fix
> this?
>
> Thanks
>
> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski  wrote:
>
>>
>> I have a job with event-time session window of 30 minutes.
>>
>> I output late events to side output, where I have a tumbling processing
>> time window of 30 minutes.
>>
>> I observe that the late events are written to storage before the "main"
>> events.
>>
>> I wanted to know if it's normal before digging into the code and
>> debugging the problem.
>>
>> Thanks
>>
>


Flink Kuberntes Libraries

2020-10-07 Thread saksham sapra
Hi ,

i have made some configuration using this link page :
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
.
and i am able to run flink on UI , but i need to submit a job using :
http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy/#/submit
through POstman, and i have some libraries which in local i can add in libs
folder but in this how can i add my libraries so that it works properly.

[image: image.png]


Re: Is it possible that late events are processed before the window?

2020-10-07 Thread Ori Popowski
I've made an experiment where I use an evictor on the main window (not the
late one), only to write a debug file when the window fires (I don't
actually evict events, I've made it so I can write a debug object the
moment the window finishes).

I can see that indeed the late data window fires before the main window,
since the mentioned debug file does not exist, but late events _do_ exist
in the destination.

Writing this debug object in the evictor eliminates potential problems that
might be due to logic in the process function, and it proves that the
window of the late events indeed fires before the main window.

Here's an outline of my job:

val windowedStream = senv
  .addSource(kafkaSource)
  ... // some operators
  // like BoundedOutOfOrdereness but ignore future timestamps
  .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
  ... // some more operators
  .keyingBy { case (meta, _) => meta.toPath }
  .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
window
  .sideOutputLateData(lateDataTag)
  .process(new ProcessSession(sessionPlayback, config))
windowedStream
  .map(new SerializeSession(sessionPlayback))
  .addSink(sink)
windowedStream
  .getSideOutput(lateDataTag)
  .keyingBy { case (meta, _) => meta.toPath }
  .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
window
  .process(new ProcessSession(sessionPlayback, config, true))
  .map(new SerializeSession(sessionPlayback, late = true))

So, to repeat the question, is that normal? And if not - how can I fix this?

Thanks

On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski  wrote:

>
> I have a job with event-time session window of 30 minutes.
>
> I output late events to side output, where I have a tumbling processing
> time window of 30 minutes.
>
> I observe that the late events are written to storage before the "main"
> events.
>
> I wanted to know if it's normal before digging into the code and debugging
> the problem.
>
> Thanks
>


flink configuration: best practice for checkpoint storage secrets

2020-10-07 Thread XU Qinghui
Hello, folks

We are trying to use S3 for the checkpoint storage, and this involves some
secrets in the configuration. We tried two approaches to configure those
secrets:
- in the jvm application argument for jobmanager and taskmanager, such as
-Ds3.secret-key
- in the flink-conf.yaml file for jobmanager and taskmanager

Is there a third way? What's the best practice?
Thanks a lot!

Best regards,
Qinghui


Re: S3 StreamingFileSink issues

2020-10-07 Thread David Anderson
Dan,

The first point you've raised is a known issue: When a job is stopped, the
unfinished part files are not transitioned to the finished state. This is
mentioned in the docs as Important Note 2 [1], and fixing this is waiting
on FLIP-46 [2]. That section of the docs also includes some S3-specific
warnings, but nothing pertaining to managing credentials. Perhaps [3] will
help.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials


On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse  wrote:

> First, let me say, Flink is super cool - thanks everyone for making my
> life easier in a lot of ways! Wish I had this 10 years ago
>
> Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
> Note that Flink is embedded in my app, not running as a standalone cluster.
>
> I am having a few problems, which I have illustrated in the small test
> case below.
>
> 1) After my job finishes, data never gets committed to S3. Looking through
> the code, I've noticed that data gets flushed to disk, but the multi-part
> upload is never finished. Even though my data doesn't hit the min part
> size, I would expect that if my job ends, my data should get uploaded since
> the job is 100% done.
>
> I am also having problems when the job is running not uploading - but I
> haven't been able to distill that down to a simple test case, so I thought
> I'd start here.
>
> 2) The S3 Filesystem does not pull credentials from the Flink
> Configuration when running in embedded mode. I have a workaround for this,
> but it is ugly. If you comment out the line in the test case which talks
> about this workaround, you will end up with a "Java.net.SocketException:
> Host is down"
>
> Can anyone shed light on these two issues? Thanks!
>
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import org.junit.jupiter.api.Test;
>
> public class S3Test {
> @Test
> public void whyDoesntThisWork() throws Exception {
> Configuration configuration = new Configuration();
> configuration.setString("state.backend",
> MemoryStateBackendFactory.class.getName());
> configuration.setString("s3.access.key", "");
> configuration.setString("s3.secret.key", "");
>
> // If I don't do this, the S3 filesystem never gets the
> credentials
> FileSystem.initialize(configuration, null);
>
> LocalStreamEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>
> StreamingFileSink s3 = StreamingFileSink
> .forRowFormat(new Path("s3://bucket/"), new
> SimpleStringEncoder())
> .build();
>
> env.fromElements("string1", "string2")
> .addSink(s3);
>
> env.execute();
>
> System.out.println("Done");
> }
> }
>
>
> --
> Dan Diephouse
> @dandiep
>


?????? why we need keyed state and operate state when we already have checkpoint?

2020-10-07 Thread ??????
Thanks for your replies,I have some understandings.


There are two cases.
1. if I use no keyed state in program,when it's killed,I can only resume from 
previous result
1. if I use      keyed state in program,when it's killed,I 
can         resume from previous result and previous 
variable temporary result.


Am I right?
Thanks for your guide.




--  --
??: 
   "Arvid Heise"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions


On Wed, Oct 7, 2020 at 6:51 AM ??