RE: kafka consumer parallelism

2017-10-03 Thread Sofer, Tovi
Hi Robert,

I had similar issue.
For me the problem was that the topic was auto created with one partition.
You can alter it to have 5 partitions using kafka-topics  command.
Example: 
kafka-topics --alter  --partitions 5 --topic fix --zookeeper localhost:2181 

Regards,
Tovi
-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: יום ב 02 אוקטובר 2017 20:59
To: user@flink.apache.org
Subject: Re: kafka consumer parallelism

Hi,

I'm not a Kafka expert but I think you need to have more than 1 Kafka partition 
to process multiple documents at the same time. Make also sure to send the 
documents to different partitions.

Regards,
Timo


Am 10/2/17 um 6:46 PM schrieb r. r.:
> Hello
> I'm running a job with "flink run -p5" and additionally set 
> env.setParallelism(5).
> The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
> In Flink UI though I notice that if I send 3 documents to Kafka, only one 
> 'instance' of the consumer seems to receive Kafka's record and send them to 
> next operators, which according to Flink UI are properly parallelized.
> What's the explanation of this behavior?
> According to sources:
>
> To enable parallel execution, the user defined source should
>       * implement {@link 
> org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
> } or extend {@link
>       * 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunc
> tion}
> which FlinkKafkaConsumer010 does
>
> Please check a screenshot at 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__imgur.com_a_E1H9r
> &d=DwIDaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99
> _MiSMX5oOs&m=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg&s=ti6cswIJ4X9
> d5wgGkq5EUx41y4WXZ_z_HebkoOrLEmw&e=   you'll see that only one sends 3 
> records to the sinks
>
> My code is here: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__pastebin.com_yjYC
> XAAR&d=DwIDaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3
> rJ99_MiSMX5oOs&m=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg&s=AApHKm3
> amPLzWwAqk2KITEeUkhNE0GS1Oo02jaUpKIw&e=
>
> Thanks!




Re: Stream Task seems to be blocked after checkpoint timeout

2017-10-03 Thread Stefan Richter
Hi,

from the stack trace, it seems to me like closing the checkpoint output stream 
to S3 is the culprit:

"pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x7fda180c4000 nid=0x55a2 
waiting on condition [0x7fda092d7000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0007154050b8> (a 
java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at 
com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)
at 
org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)
- locked <0x0007154801d0> (a 
org.apache.hadoop.fs.s3a.S3AOutputStream)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at 
org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:48)
at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
- locked <0x000715480238> (a 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
- locked <0x00073ef55b00> (a 
org.apache.flink.runtime.util.SerializableObject)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
at 
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

In particular, this holds lock 0x00073ef55b00, which blocks the next 
checkpoint in it’s synchronous phase:

"count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer (7/12)" 
#454093 daemon prio=5 os_prio=0 tid=0x7fda2804 nid=0x2f3b waiting for 
monitor entry [0x7fda0a5e8000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshotFully(RocksDBKeyedStateBackend.java:379)
- waiting to lock <0x00073ef55b00> (a 
org.apache.flink.runtime.util.SerializableObject)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
- locked <0x00073ee55068> (a java.lang.Object)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at 
org.apache.flink.stream

Re: At end of complex parallel flow, how to force end step with parallel=1?

2017-10-03 Thread Gábor Gévay
Hi Garrett,

You can call .setParallelism(1) on just this operator:

ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)

Best,
Gabor



On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton  wrote:
> I have a complex alg implemented using the DataSet api and by default it
> runs with parallel 90 for good performance. At the end I want to perform a
> clustering of the resulting data and to do that correctly I need to pass all
> the data through a single thread/process.
>
> I read in the docs that as long as I did a global reduce using
> DataSet.reduceGroup(new GroupReduceFunction) that it would force it to a
> single thread.  Yet when I run the flow and bring it up in the ui, I see
> parallel 90 all the way through the dag including this one.
>
> Is there a config or feature to force the flow back to a single thread?  Or
> should I just split this into two completely separate jobs?  I'd rather not
> split as I would like to use flinks ability to iterate on this alg and
> cluster combo.
>
> Thank you


Re: Stream Task seems to be blocked after checkpoint timeout

2017-10-03 Thread Tony Wei
Hi Stefan,

Thank you very much. I will try to investigate what's the problem on my
cluster and S3.
BTW, Is there any Jira issue associated with your improvement, so that I
can track it?

Best Regards,
Tony Wei

2017-10-03 16:01 GMT+08:00 Stefan Richter :

> Hi,
>
> from the stack trace, it seems to me like closing the checkpoint output
> stream to S3 is the culprit:
>
> "pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x7fda180c4000
> nid=0x55a2 waiting on condition [0x7fda092d7000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0007154050b8> (a java.util.concurrent.
> FutureTask)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
> at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> * at
> com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)*
> * at
> org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)*
> - locked <0x0007154801d0> (a org.apache.hadoop.fs.s3a.S3AOutputStream)
> at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(
> FSDataOutputStream.java:72)
> at org.apache.hadoop.fs.FSDataOutputStream.close(
> FSDataOutputStream.java:106)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.
> close(HadoopDataOutputStream.java:48)
> at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(
> ClosingFSDataOutputStream.java:64)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$
> FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.
> java:319)
> - locked <0x000715480238> (a org.apache.flink.runtime.
> state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandl
> e(RocksDBKeyedStateBackend.java:693)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullSnapshotOperation.closeCheckpointStream(
> RocksDBKeyedStateBackend.java:531)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.
> performOperation(RocksDBKeyedStateBackend.java:420)
> - locked <0x00073ef55b00> (a org.apache.flink.runtime.util.
> SerializableObject)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.
> performOperation(RocksDBKeyedStateBackend.java:399)
> at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.
> call(AbstractAsyncIOCallable.java:72)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:897)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> In particular, this holds lock 0x00073ef55b00, which blocks the next
> checkpoint in it’s synchronous phase:
>
> "count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer
> (7/12)" #454093 daemon prio=5 os_prio=0 tid=0x7fda2804 nid=0x2f3b
> waiting for monitor entry [0x7fda0a5e8000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> snapshotFully(RocksDBKeyedStateBackend.java:379)
> *- waiting to lock <0x00073ef55b00> (a
> org.apache.flink.runtime.util.SerializableObject)*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)*
> * at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpointState(StreamTask.java:654)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCheckpoint(StreamTask.java:590)
> - locked <0x00073ee55068> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCheckpointOnBarrier(StreamTask.java:543)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(
> BarrierBuffer.java:378)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> processBarrier(BarrierBuffer.java:281)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(
> BarrierBuffer.java:183)
> * at org.apache.flink.streaming.runtime

Re: kafka consumer parallelism

2017-10-03 Thread Carst Tankink
(Accidentally sent this to Timo instead of to-list...)

Hi,

What Timo says is true, but in case you have a higher parallism than the number 
of partitions (because you want to make use of it in a future operator), you 
could do a .rebalance() (see 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#physical-partitioning)
 after the Kafka source.
This makes sure that all operators after the Kafka source get an even load, at 
the cost of having to redistribute the documents (so there is de/serialization 
+ network overhead).


Carst

On 10/3/17, 09:34, "Sofer, Tovi "  wrote:

Hi Robert,

I had similar issue.
For me the problem was that the topic was auto created with one partition.
You can alter it to have 5 partitions using kafka-topics  command.
Example: 
kafka-topics --alter  --partitions 5 --topic fix --zookeeper localhost:2181 

Regards,
Tovi
-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: יום ב 02 אוקטובר 2017 20:59
To: user@flink.apache.org
Subject: Re: kafka consumer parallelism

Hi,

I'm not a Kafka expert but I think you need to have more than 1 Kafka 
partition to process multiple documents at the same time. Make also sure to 
send the documents to different partitions.

Regards,
Timo


Am 10/2/17 um 6:46 PM schrieb r. r.:
> Hello
> I'm running a job with "flink run -p5" and additionally set 
env.setParallelism(5).
> The source of the stream is Kafka, the job uses FlinkKafkaConsumer010.
> In Flink UI though I notice that if I send 3 documents to Kafka, only one 
'instance' of the consumer seems to receive Kafka's record and send them to 
next operators, which according to Flink UI are properly parallelized.
> What's the explanation of this behavior?
> According to sources:
>
> To enable parallel execution, the user defined source should
>   * implement {@link 
> org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
> } or extend {@link
>   * 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunc
> tion}
> which FlinkKafkaConsumer010 does
>
> Please check a screenshot at 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__imgur.com_a_E1H9r
> &d=DwIDaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99
> _MiSMX5oOs&m=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg&s=ti6cswIJ4X9
> d5wgGkq5EUx41y4WXZ_z_HebkoOrLEmw&e=   you'll see that only one sends 3 
> records to the sinks
>
> My code is here: 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__pastebin.com_yjYC
> XAAR&d=DwIDaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3
> rJ99_MiSMX5oOs&m=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg&s=AApHKm3
> amPLzWwAqk2KITEeUkhNE0GS1Oo02jaUpKIw&e=
>
> Thanks!






Re: Sink buffering

2017-10-03 Thread nragon
Anyone? :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Fw: Question on Flink on Window

2017-10-03 Thread Tay Zhen Shen




From: Tay Zhen Shen
Sent: Tuesday, 3 October, 2017 1:03 PM
To: user@flink.apache.org
Subject: Question on Flink on Window


Hi,


I'm currently trying to setup Flink 1.3.2 on my Windows 10. When i was running 
the start-local.bat file, the command prompt shows that i have missing files. I 
tracked everything and i found that in the start-local.bat, there are 
configurations which points to bin/../lib. I search through the file downloaded 
from the site and i couldn't find any lib file. How can i solve this?


Thank you.




Regards,

Tay Zhen Shen

Student of USM (Malaysia)


Custom Metrics Reporting in Flink

2017-10-03 Thread Rahul Raj
Hi,

Is there any good example for custom metrics reporting in Flink? I tried to
follow the documentation but failed to achieve the result. Basically my
task is to identify the corrupt records based on a missing field and report
the same via statsd reporter.


Rahul Raj


Re: Stream Task seems to be blocked after checkpoint timeout

2017-10-03 Thread Stefan Richter
Sure, I opened Jira FLINK-7757 and this PR: 
https://github.com/apache/flink/pull/4764 
 .

Best,
Stefan

> Am 03.10.2017 um 10:25 schrieb Tony Wei :
> 
> Hi Stefan,
> 
> Thank you very much. I will try to investigate what's the problem on my 
> cluster and S3.
> BTW, Is there any Jira issue associated with your improvement, so that I can 
> track it?
> 
> Best Regards,
> Tony Wei
> 
> 2017-10-03 16:01 GMT+08:00 Stefan Richter  >:
> Hi,
> 
> from the stack trace, it seems to me like closing the checkpoint output 
> stream to S3 is the culprit:
> 
> "pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x7fda180c4000 nid=0x55a2 
> waiting on condition [0x7fda092d7000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x0007154050b8> (a 
> java.util.concurrent.FutureTask)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)
>   at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)
>   - locked <0x0007154801d0> (a 
> org.apache.hadoop.fs.s3a.S3AOutputStream)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:48)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
>   - locked <0x000715480238> (a 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
>   - locked <0x00073ef55b00> (a 
> org.apache.flink.runtime.util.SerializableObject)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
>   at org.apache.flink.runtime.io 
> .async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 
> In particular, this holds lock 0x00073ef55b00, which blocks the next 
> checkpoint in it’s synchronous phase:
> 
> "count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer 
> (7/12)" #454093 daemon prio=5 os_prio=0 tid=0x7fda2804 nid=0x2f3b 
> waiting for monitor entry [0x7fda0a5e8000]
>java.lang.Thread.State: BLOCKED (on object monitor)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshotFully(RocksDBKeyedStateBackend.java:379)
>   - waiting to lock <0x00073ef55b00> (a 
> org.apache.flink.runtime.util.SerializableObject)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.jav

javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated for S3 access

2017-10-03 Thread Hao Sun
I am using S3 for checkpointing and external ckp as well.

s3a://bucket/checkpoints/e58d369f5a181842768610b5ab6a500b


I have this exception, and not sure what I can do with it.
I guess to configure hadoop to use some SSLFactory?

I am not using hadoop, I am on kubernetes (in AWS) with S3


Thanks!

= Logs =

2017-10-03 17:52:27,452 INFO  com.amazonaws.http.AmazonHttpClient
 - Unable to execute HTTP request: The target
server failed to respond
org.apache.http.NoHttpResponseException: The target server failed to respond
at 
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:95)
at 
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:62)
at 
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:254)
at 
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:289)
at 
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:252)
at 
org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:191)
at 
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:300)
at 
com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
at 
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:127)
at 
org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:715)
at 
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:520)
at 
org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
at 
org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
at 
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:384)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:859)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:228)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:203)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:41)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1294)
at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:323)
at 
org.apache.flink.runtime.state.JavaSerializer.serialize(JavaSerializer.java:70)
at 
org.apache.flink.runtime.state.JavaSerializer.serialize(JavaSerializer.java:33)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.write(DefaultOperatorStateBackend.java:463)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:263)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
at 
org.apache.flink.runtime.i

Re: At end of complex parallel flow, how to force end step with parallel=1?

2017-10-03 Thread Garrett Barton
Gábor
​,
Thank you for the reply, I gave that a go and the flow still showed
parallel 90 for each step.  Is the ui not 100% accurate perhaps?

To get around it for now I implemented a partitioner that threw all the
data to the same partition, hack but works!​

On Tue, Oct 3, 2017 at 4:12 AM, Gábor Gévay  wrote:

> Hi Garrett,
>
> You can call .setParallelism(1) on just this operator:
>
> ds.reduceGroup(new GroupReduceFunction...).setParallelism(1)
>
> Best,
> Gabor
>
>
>
> On Mon, Oct 2, 2017 at 3:46 PM, Garrett Barton 
> wrote:
> > I have a complex alg implemented using the DataSet api and by default it
> > runs with parallel 90 for good performance. At the end I want to perform
> a
> > clustering of the resulting data and to do that correctly I need to pass
> all
> > the data through a single thread/process.
> >
> > I read in the docs that as long as I did a global reduce using
> > DataSet.reduceGroup(new GroupReduceFunction) that it would force it
> to a
> > single thread.  Yet when I run the flow and bring it up in the ui, I see
> > parallel 90 all the way through the dag including this one.
> >
> > Is there a config or feature to force the flow back to a single thread?
> Or
> > should I just split this into two completely separate jobs?  I'd rather
> not
> > split as I would like to use flinks ability to iterate on this alg and
> > cluster combo.
> >
> > Thank you
>


Re: Custom Metrics Reporting in Flink

2017-10-03 Thread Chesnay Schepler
This doesn't sound like a proper use-case for the metric system, but for 
side-outputs with a dedicated sink.


My reasoning is that it sounds like you want to push data, whereas the 
metrics work by drawing a snapshot of all metrics

and writing them out, i.e. one value per metric.

On 03.10.2017 13:51, Rahul Raj wrote:

Hi,

Is there any good example for custom metrics reporting in Flink? I 
tried to follow the documentation but failed to achieve the result. 
Basically my task is to identify the corrupt records based on a 
missing field and report the same via statsd reporter.



Rahul Raj





[no subject]

2017-10-03 Thread Aniket Deshpande
-- 
Yours Sincerely,
Aniket S Deshpande.


Classloader error after SSL setup

2017-10-03 Thread Aniket Deshpande
 Background: We have a setup of Flink 1.3.1 along with a secure MAPR
cluster (Flink is running on mapr client nodes). We run this flink cluster
via flink-jobmanager.sh foreground and flink-taskmanager.sh foreground command
via Marathon.  In order for us to make this work, we had to add -Djavax.net
.ssl.trustStore="$JAVA_HOME/jre/lib/security/cacerts" in flink-console.sh as
extra JVM arg (otherwise, flink was taking MAPR's ssl_truststore as default
truststore and then we were facing issues for any 3rd party jars like
aws_sdk etc.). This entire setup was working fine as it is and we could
submit our jars and the pipelines ran without any problem


Problem: We started experimenting with enabling ssl for all communication
for Flink. For this, we followed
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/security-ssl.html
for
generating CA and keystore. I added the following properties to
flink-conf.yaml:


security.ssl.enabled: true
security.ssl.keystore: /opt/flink/certs/node1.keystore
security.ssl.keystore-password: 
security.ssl.key-password: 
security.ssl.truststore: /opt/flink/certs/ca.truststore
security.ssl.truststore-password: 
jobmanager.web.ssl.enabled: true
taskmanager.data.ssl.enabled: true
blob.service.ssl.enabled: true
akka.ssl.enabled: true


We then spin up a cluster and tried submitting the same job which was
working before. We get the following erros:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:229)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:230)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)


This error disappears when we remove the ssl config properties i.e run
flink cluster without ssl enabled.


So, did we miss any steps for enabling ssl?


P.S.: We tried removing the extra JVm arg mentioned above, but still get
the same error.

-- 

Aniket