Monitor number of keys per Taskmanager

2019-10-22 Thread Flavio Pompermaier
Hi to all,
I was looking into the Flink example of the Flink training trying to
understand why in the ClickEventCount[1]  one task manager was reading
twice the speed of the other.

I had to debug a lot of internal code of Flink to understand that it
depends on the adopted hash function (used by Flink to assign keys to
taskmanagers) that was assigning 4 keys to a TM and 2 to the other. Is
there a smarter way to monitor this thing (e.g. a metric like
taskManager_numKeys)?

I also discovered that one cannot force how to partition keys per
taskmanager (i.e. use keyBy after a customPartition). Is there any
development effort in this direction?

Best,
Flavio

[1]
https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java


Re: Flink grpc-netty-shaded NoClassDefFoundError

2019-10-22 Thread dhanesh arole
Just to give you more context,

We are using `com.google.cloud.bigtable` as well in our job dep. Could it
be due to shaded plugin issue with `bigtable-hbase-2.x` ?


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )



On Tue, Oct 22, 2019 at 2:06 PM dhanesh arole 
wrote:

> Hello all,
>
> We are running on Flink 1.6.2 and have a couple of streaming jobs running
> on it. We are intermittently seeing  *Java.lang.NoClassDefFoundError* with
> below stack trace[1]. The Flink job was working fine but due to recent this
> errors, task managers are now frequently crashing and causing to restart
> the job. We haven't changed any dep version / Flink version, so we are not
> sure if it's related to version mismatch. This is only happening with
> classes from *io.grpc.netty.shaded* package. Even the classes that
> FlinkClassLoader is not able to find are actually present in fat jar built
> using shadowJar - (validated by looking at *jar tf flink-job.jar).*
>
> Has anyone faced such issue before? Any pointers on how to debug this
> would be really helpful.
>
> I am attaching output of './gradlew dep' as well for reference, so as to
> validate if we don't have some wrong or flaky dependencies.
>
> [1] Stack trace: Task manager Java.lang.NoClassDefFoundError
>
> "java.lang.NoClassDefFoundError:
> io/grpc/netty/shaded/io/netty/channel/AbstractChannel$AbstractUnsafe$8
> at
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.deregister(AbstractChannel.java:817)
> at
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.fireChannelInactiveAndDeregister(AbstractChannel.java:777)
> at
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:760)
> at
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at
> io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException:
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$8
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 13 more
> "
>


Re: Submitting jobs via REST

2019-10-22 Thread Timothy Victor
Thank you for your help Pritam.  I got this working now.   The missing pool
piece for me was that I had to set the "web.upload.dir" which my default is
some auto generated directory under tmp.

One problem I had was that the jar actually needs to be placed under
$web.upload.dir/flink-web-upload, i.e.a hardcoded subdir under the user
specified dir.   It would be nice if that didn't happen and flink just read
directly from the root ($web.upload.dir).   I can create a ticket for it to
see if there is any general interest in such a change.

Thanks again for your help

Tim

On Mon, Oct 21, 2019, 10:37 AM Pritam Sadhukhan 
wrote:

> Can you please share your dockerfile?
> Please upload your jar at /opt/flink/product-libs/flink-web-upload/.
>
> Regards,
> Pritam.
>
> On Mon, 21 Oct 2019 at 19:58, Timothy Victor  wrote:
>
>> Thanks Pritam.
>>
>> Unfortunately this does not work for me.  I get a response that says
>> "jar file /tmp/flink-web-/flink-web-upload/ does not exist".
>>
>> It is looking for the jar in the tmp folder.  Wonder of there is a way to
>> change that so that it looks in the right folder.
>>
>> Thanks
>>
>> Tim
>>
>> On Sun, Oct 20, 2019, 7:55 AM Pritam Sadhukhan <
>> sadhukhan.pri...@gmail.com> wrote:
>>
>>> Hi Tim,
>>>
>>> I have the similar scenario where I have embedded my jar within the
>>> image.
>>>
>>> I used the following command to submit the job :
>>>
>>> curl -X POST  http://localhost:8081/jars/.jar/run
>>> 
>>>
>>> with the request parameters if any.
>>>
>>>
>>> Please let me know if this helps.
>>>
>>>
>>> Regards,
>>>
>>> Pritam.
>>>
>>> On Sat, 19 Oct 2019 at 20:06, Timothy Victor  wrote:
>>>
 I have a flink docker image with my job's JAR already contained
 within.  I would like to run a job with this jar via the REST api.  Is that
 possible?

 I know I can run a job via REST using JarID (ID assigned by flink when
 a jar is uploaded).   However I don't have such an ID since this jar is
 already part of the image.

 Via CLI I can start a job using classpath.  But can I do the same via
 the REST api.   Any other ways to achieve this?

 Thanks

 Tim

>>>


Re: Comparing Storm and Flink resource requirements

2019-10-22 Thread vino yang
Hi Gyula,

Based on our previous experience switching from Storm to Flink. For the
same business, resources of the same size are completely sufficient, and
the performance indicators are slightly better than Storm. As you said,
this may be related to using some of Flink's special features like state.
Our state was small at the time, and the main business was real-time ETL.
If it is a different type of business, the problem may be more complicated
and may require a specific analysis of the specific problem.

Best,
Vino

Gyula Fóra  于2019年10月21日周一 下午8:15写道:

> Hi All!
>
> I would like to ask the community for any experience regarding migration
> from Storm to Flink production applications.
>
> Specifically I am interested in your experience related to the resource
> requirements for the same pipeline as implemented in Flink vs in Storm. The
> design of the applications might be significantly different for the 2
> systems (state, added complexity by new features enabled) so I don't expect
> precise numbers, estimates would be completely fine :)
>
> I understand that this is a quite complex question and I would really
> appreciate any information you can provide.
>
> Thank you all!
> Gyula
>


Re: Comparing Storm and Flink resource requirements

2019-10-22 Thread Gyula Fóra
Thanks Vino, this is very helpful.

At the end of the day we are looking for something like:
A job that requires N nodes in Storm, only requires K nodes when
implemented in Flink, where hopefully K < N :)

Cheers,
Gyula

On Tue, Oct 22, 2019 at 2:31 PM vino yang  wrote:

> Hi Gyula,
>
> Based on our previous experience switching from Storm to Flink. For the
> same business, resources of the same size are completely sufficient, and
> the performance indicators are slightly better than Storm. As you said,
> this may be related to using some of Flink's special features like state.
> Our state was small at the time, and the main business was real-time ETL.
> If it is a different type of business, the problem may be more complicated
> and may require a specific analysis of the specific problem.
>
> Best,
> Vino
>
> Gyula Fóra  于2019年10月21日周一 下午8:15写道:
>
>> Hi All!
>>
>> I would like to ask the community for any experience regarding migration
>> from Storm to Flink production applications.
>>
>> Specifically I am interested in your experience related to the resource
>> requirements for the same pipeline as implemented in Flink vs in Storm. The
>> design of the applications might be significantly different for the 2
>> systems (state, added complexity by new features enabled) so I don't expect
>> precise numbers, estimates would be completely fine :)
>>
>> I understand that this is a quite complex question and I would really
>> appreciate any information you can provide.
>>
>> Thank you all!
>> Gyula
>>
>


Re: Issue with BulkWriter

2019-10-22 Thread amran dean
Hello,
These changes result in the following error:
$ lzop -d part-1-0
lzop: part-1-0: not a lzop file


public class BulkRecordLZOSerializer implements BulkWriter {

private final CompressionOutputStream compressedStream;

public BulkRecordLZOSerializer(OutputStream stream) {
CompressionCodecFactory factory = new
CompressionCodecFactory(new Configuration());
try {
compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream);
} catch (IOException e) {
throw new IllegalStateException("Unable to create LZO
OutputStream");
}
}

public void addElement(KafkaRecord record) throws IOException {
compressedStream.write(record.getValue());
compressedStream.write('\n');
}

public void finish() throws IOException {
compressedStream.flush();
compressedStream.finish();
}

public void flush() throws IOException {
compressedStream.flush();
}
}


On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi,
>
> Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
> instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.
>
> compressedStream = 
> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>
>
> Regarding "lzop: unexpected end of file" problem, kindly add
> "compressedStream.flush()" in the below method to flush any leftover data
> before finishing.
>
> public void finish() throws IOException {
>   compressedStream.flush();
>   compressedStream.finish();
> }
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--
>
> Regards,
> Ravi
>
> On Tue, Oct 22, 2019 at 4:10 AM amran dean  wrote:
>
>> Hello,
>> I'm using BulkWriter to write newline-delimited, LZO-compressed files.
>> The logic is very straightforward (See code below).
>>
>> I am experiencing an issue decompressing the created files created in
>> this manner, consistently getting "lzop: unexpected end of file". Is this
>> an issue with caller of BulkWriter?
>>
>> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
>> in gibberish. I'm very confused what is going on.
>>
>> private final CompressionOutputStream compressedStream;
>>
>> public BulkRecordLZOSerializer(OutputStream stream) {
>> CompressionCodecFactory factory = new CompressionCodecFactory(new 
>> Configuration());
>> try {
>> compressedStream = 
>> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>> } catch (IOException e) {
>> throw new IllegalStateException("Unable to create LZO OutputStream");
>> }
>> }
>>
>> public void addElement(KafkaRecord record) throws IOException {
>> compressedStream.write(record.getValue());
>> compressedStream.write('\n');
>> }
>>
>> public void finish() throws IOException {
>> compressedStream.finish();
>> }
>>
>> public void flush() throws IOException {
>> compressedStream.flush();
>> }
>>
>>


RE: Could not load the native RocksDB library

2019-10-22 Thread Thad Truman
Hi Samya,

Were you able to get this resolved? Seeing the same issue here after upgrading 
to Flink 1.9 from 1.6.

Thanks,

Thad

From: Andrey Zagrebin 
Sent: Wednesday, July 3, 2019 9:09 AM
To: Haibo Sun 
Cc: Patro, Samya ; user@flink.apache.org; Bari, Swapnil 

Subject: Re: Could not load the native RocksDB library

Hi Samya,

Additionally to Haibo's answer:
Have you tried the previous 1.7 version of Flink? The Rocksdb version was 
upgraded in 1.8 version.

Best,
Andrey

On Wed, Jul 3, 2019 at 5:21 AM Haibo Sun 
mailto:sunhaib...@163.com>> wrote:
Hi,  Samya.Patro

I guess this may be a setup problem. What OS and what version of JDK do you 
use?  You can try upgrading JDK to see if the issue can be solved.

Best,
Haibo

At 2019-07-02 17:16:59, "Patro, Samya" 
mailto:samya.pa...@gs.com>> wrote:

Hello,
I am using rocksdb for storing state . But when I run the pipeline I get the 
error   ”Could not load the native RocksDB library” .  Kindly can you check the 
configs and error stacktrace and suggest what am I doing wrong .

Flink version  - 1.8.0



org.apache.flink
flink-statebackend-rocksdb_2.11
1.8.0


This is  the flink checkpointing config I have used

executionEnvironment.enableCheckpointing(30);
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60);
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StateBackend rocksDbBackend = new 
RocksDBStateBackend(parameter.get("stateBackendPath"),true);
executionEnvironment.setStateBackend(rocksDbBackend);

When I run the pipeline, I get this error

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for StreamFlatMap_9dd63673dd41ea021b896d5203f3ba7c_(1/5) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 5 more
Caused by: java.io.IOException: Could not load the native RocksDB library
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:911)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:482)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 7 more
Caused by: java.lang.UnsatisfiedLinkError: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
/tmp/rocksdb-lib-ee961b2f013f7d5baabbc1cb2b0b87d7/librocksdbjni-linux64.so: 
undefined symbol: malloc_stats_print
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at 
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:888)
... 11 more


Thanks and  Regards,
Samya Ranjan Patro
Goldman sachs




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your per

Re: Issue with BulkWriter

2019-10-22 Thread Ravi Bhushan Ratnakar
Hi,

If possible, kindly share one output file to inspect, in the meanwhile you
could also give a try with "org.apache.hadoop.io.compress.GzipCodec"

Regards,
Ravi

On Tue, Oct 22, 2019 at 7:25 PM amran dean  wrote:

>
> Hello,
> These changes result in the following error:
> $ lzop -d part-1-0
> lzop: part-1-0: not a lzop file
>
>
> public class BulkRecordLZOSerializer implements BulkWriter {
>
> private final CompressionOutputStream compressedStream;
>
> public BulkRecordLZOSerializer(OutputStream stream) {
> CompressionCodecFactory factory = new CompressionCodecFactory(new 
> Configuration());
> try {
> compressedStream = 
> factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream);
> } catch (IOException e) {
> throw new IllegalStateException("Unable to create LZO 
> OutputStream");
> }
> }
>
> public void addElement(KafkaRecord record) throws IOException {
> compressedStream.write(record.getValue());
> compressedStream.write('\n');
> }
>
> public void finish() throws IOException {
> compressedStream.flush();
> compressedStream.finish();
> }
>
> public void flush() throws IOException {
> compressedStream.flush();
> }
> }
>
>
> On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi,
>>
>> Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
>> instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.
>>
>> compressedStream = 
>> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>>
>>
>> Regarding "lzop: unexpected end of file" problem, kindly add
>> "compressedStream.flush()" in the below method to flush any leftover data
>> before finishing.
>>
>> public void finish() throws IOException {
>>   compressedStream.flush();
>>   compressedStream.finish();
>> }
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--
>>
>> Regards,
>> Ravi
>>
>> On Tue, Oct 22, 2019 at 4:10 AM amran dean 
>> wrote:
>>
>>> Hello,
>>> I'm using BulkWriter to write newline-delimited, LZO-compressed files.
>>> The logic is very straightforward (See code below).
>>>
>>> I am experiencing an issue decompressing the created files created in
>>> this manner, consistently getting "lzop: unexpected end of file". Is this
>>> an issue with caller of BulkWriter?
>>>
>>> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
>>> in gibberish. I'm very confused what is going on.
>>>
>>> private final CompressionOutputStream compressedStream;
>>>
>>> public BulkRecordLZOSerializer(OutputStream stream) {
>>> CompressionCodecFactory factory = new CompressionCodecFactory(new 
>>> Configuration());
>>> try {
>>> compressedStream = 
>>> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>>> } catch (IOException e) {
>>> throw new IllegalStateException("Unable to create LZO 
>>> OutputStream");
>>> }
>>> }
>>>
>>> public void addElement(KafkaRecord record) throws IOException {
>>> compressedStream.write(record.getValue());
>>> compressedStream.write('\n');
>>> }
>>>
>>> public void finish() throws IOException {
>>> compressedStream.finish();
>>> }
>>>
>>> public void flush() throws IOException {
>>> compressedStream.flush();
>>> }
>>>
>>>