Re: ProcessFunction's Event Timer not firing

2018-11-10 Thread Fritz Budiyanto
Thanks Hequn for the pointer.

From what I read, I may also need to emit the timestamp regularly for all idle 
partitions to ensure watermark progression.

—
Fritz

> On Nov 8, 2018, at 6:02 PM, Hequn Cheng  wrote:
> 
> Hi Fritz,
> 
> Watermarks are merged on stream shuffles. If one of the input's watermark not 
> progressing, they will not advance the event time at the operators. I think 
> you should decrease the parallelism of source and make sure there are data in 
> each of your source partition. 
> Note that the Kafka source supports per-partition watermarking, which you can 
> read more about here[1].
> 
> Best, Hequn
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>  
> 
> 
> 
> On Fri, Nov 9, 2018 at 1:56 AM Fritz Budiyanto  > wrote:
> Hi All,
> 
> I noticed if one of the slot's watermark not progressing, its impacting all 
> slots processFunction timer and no timer are not firing. 
> 
> In my example, I have Source parallelism set to 8 and Kafka partition is 4. 
> The next operator is processFunction with parallelism of 8 +  event timer. I 
> can see from the debug log that one of the slot's watermark is not 
> progressing. As a result, all slot's timer in the process function are not 
> firing. Is this expected behavior or issue? How do I prevent this condition?
> 
> Thanks,
> Fritz



How to use multiple sources with multiple sinks

2018-11-10 Thread Flink Developer
How can I configure 1 Flink Job (stream execution environment, parallelism set 
to 10) to have multiple kafka sources where each has its' own sink to s3.

For example, let's say the sources are:

- Kafka Topic A - Consumer (10 partitions)
- Kafka Topic B - Consumer (10 partitions)
- Kafka Topic C - Consumer (10 partitions)

And let's say the sinks are:

- BucketingSink to S3 in bucket: s3://kafka_topic_a/
- BucketingSink to S3 in bucket: s3://kafka_topic_b/
- BucketingSink to S3 in bucket: s3://kafka_topic_c/

And between source 1 to sink 1, I would like to perform unique processing. 
Between source 2 to sink 2, it should have unique processing and between source 
3 to sink 3, it should also have unique processing.

How can this be achieved? Is there an example?

Re: java.io.IOException: NSS is already initialized

2018-11-10 Thread Hao Sun
Hi Ufuk, thanks for checking. I am using openJDK 1.8_171, I still have the
same issue with presto.

- why checkpoint is not starting from 1? old chk stored in ZK caused it, I
cleaned it up, but not very helpful
- I switched to Flink + Hadoop28, and used hadoop s3, with no other
changes, check pointing is working with the hadoop flavour.

On Fri, Nov 9, 2018 at 2:02 PM Ufuk Celebi  wrote:

> Hey Hao Sun,
>
> - Is this an intermittent failure or permanent? The logs indicate that
> some checkpoints completed before the error occurs (e.g. checkpoint
> numbers are greater than 1).
>
> - Which Java versions are you using? And which Java image? I've
> Googled similar issues that seem to be related to the JVM, e.g. [1].
>
> Best,
>
> Ufuk
>
> [1]
> https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972
> 
>
>
> On Thu, Nov 8, 2018 at 8:55 PM Hao Sun  wrote:
> >
> > Thanks, any insight/help here is appreciated.
> >
> > On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz 
> wrote:
> >>
> >> Hi Hao,
> >>
> >> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas who
> were recently working with S3, maybe they will have some ideas.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 03/11/2018 03:09, Hao Sun wrote:
> >>
> >> Same environment, new error.
> >>
> >> I can run the same docker image with my local Mac, but on K8S, this
> gives me this error.
> >> I can not think of any difference between local Docker and K8S Docker.
> >>
> >> Any hint will be helpful. Thanks
> >>
> >> 
> >>
> >> 2018-11-02 23:29:32,981 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
> ConnectedStreams maxwell.accounts ()
> switched from state RUNNING to FAILING.
> >> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 235 for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).}
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: java.lang.Exception: Could not materialize checkpoint 235
> for operator Source: KafkaSource(maxwell.accounts) ->
> MaxwellFilter->Maxwell(maxwell.accounts) ->
> FixedDelayWatermark(maxwell.accounts) ->
> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink:
> influxdbSink(maxwell.accounts) (1/1).
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >> ... 6 more
> >> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NoClassDefFoundError: Could not initialize class
> sun.security.ssl.SSLSessionImpl
> >> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> >>
> >> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(
> http://OperatorSnapshotFinalizer.java:53
> )
> >>
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >> ... 5 more
> >> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> sun.security.ssl.SSLSessionImpl
> >> at sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604)
> >>
> >> at sun.security.ssl.SSLSocketImpl.(http://SSLSocketImpl.java:572
> )
> >>
> >> at
> sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
> >> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
> >> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
> >> at
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
> >> at
> org.apache.flink.fs.s3presto.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOpe