Re: hadoopcompatibility not in dist

2017-04-07 Thread Petr Novotnik
Hey Fabi,

many thanks for your clarifications! It seems flink-shaded-hadoop2
itself is already included in the binary distribution:

> $ jar tf flink-1.2.0/lib/flink-dist_2.10-1.2.0.jar | grep org/apache/hadoop | 
> head -n3
> org/apache/hadoop/
> org/apache/hadoop/fs/
> org/apache/hadoop/fs/FileSystem$Statistics$StatisticsAggregator.class

That's why adding just the hadoop-compatibility jar fixed the problem
for me. I'm not at all into how flink handles class loading yet, but at
the first look into `TypeExtractor` I was surprised to see it _not_
using  the thread's current context class loader [1] (with a fallback to
its own classloader). This has led me to investigating the jars'
contents and find the problem. I'll set up a jira ticket for this issue
on Monday.

Have a nice weekend,
P.

[1]
> http://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader



On 04/07/2017 09:24 PM, Fabian Hueske wrote:
> Hi Petr,
> 
> I think that's an expected behavior because the exception is intercepted
> and enriched with an instruction to solve the problem.
> As you assumed, you need to add the flink-hadoop-compatibility JAR file
> to the ./lib folder. Unfortunately, the file is not included in the
> binary distribution.
> You can either build it from source or manually download it from a
> public Maven repository. You might need to add the flink-shaded-hadoop2
> jar file as well, which is a dependency of flink-hadoop-compatibility.
> 
> I think we should make that easier for users and add a pre-built jar
> file to the ./opt folder of the binary distribution.
> Would you mind to open a JIRA for this?
> 
> Now a bit of background why we moved the TypeInfo to
> flink-hadoop-compatibility. We are preparing Flink's core to become
> independent of Hadoop, i.e., Flink core should not require Hadoop. We
> will of course keep the option to run Flink on YARN and write data to
> HDFS, but this should be optional and not baked into the core.
> 
> Best, Fabian
> 
> 
> 
> 2017-04-07 16:27 GMT+02:00 Petr Novotnik  >:
> 
> Hello,
> 
> with 1.2.0 `WritableTypeInfo` got moved into its own artifact
> (flink-hadoop-compatibility_2.10-1.2.0.jar). Unlike with 1.1.0, the
> distribution jar `flink-dist_2.10-1.2.0.jar` does not include the hadoop
> compatibility classes anymore. However, `TypeExtractor` which is part of
> the distribution jar tries to load `WritableTypeInfo` using it was
> loaded itself from:
> 
> >   Class typeInfoClass;
> >   try {
> >   typeInfoClass =
> Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false,
> TypeExtractor.class.getClassLoader());
> >   }
> >   catch (ClassNotFoundException e) {
> >   throw new RuntimeException("Could not load
> the TypeInformation for the class '"
> >   + HADOOP_WRITABLE_CLASS +
> "'. You may be missing the 'flink-hadoop-compatibility' dependency.");
> >   }
> 
> Adding `flink-hadoop-compatibility` to my application jar leads to the
> following stack trace on yarn (running `bin/flink run -m
> yarn-cluster...`):
> 
> > Caused by: java.lang.RuntimeException: Could not load the
> TypeInformation for the class 'org.apache.hadoop.io
> .Writable'. You may be missing the
> 'flink-hadoop-compatibility' dependency.
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:998)
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629)
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595)
> >   at
> 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588)
> >   at
> org.apache.flink.api.common.typeinfo.TypeHint.(TypeHint.java:47)
> >   at
> cz.seznam.euphoria.benchmarks.flink.Util$2.(Util.java:80)
> 
> I guess I'm supposed to customize my flink installation by adding the
> hadoop-compatibility jar to flink's `lib` dir, correct? If so, is 

Re: hadoopcompatibility not in dist

2017-04-07 Thread Fabian Hueske
Hi Petr,

I think that's an expected behavior because the exception is intercepted
and enriched with an instruction to solve the problem.
As you assumed, you need to add the flink-hadoop-compatibility JAR file to
the ./lib folder. Unfortunately, the file is not included in the binary
distribution.
You can either build it from source or manually download it from a public
Maven repository. You might need to add the flink-shaded-hadoop2 jar file
as well, which is a dependency of flink-hadoop-compatibility.

I think we should make that easier for users and add a pre-built jar file
to the ./opt folder of the binary distribution.
Would you mind to open a JIRA for this?

Now a bit of background why we moved the TypeInfo to
flink-hadoop-compatibility. We are preparing Flink's core to become
independent of Hadoop, i.e., Flink core should not require Hadoop. We will
of course keep the option to run Flink on YARN and write data to HDFS, but
this should be optional and not baked into the core.

Best, Fabian



2017-04-07 16:27 GMT+02:00 Petr Novotnik :

> Hello,
>
> with 1.2.0 `WritableTypeInfo` got moved into its own artifact
> (flink-hadoop-compatibility_2.10-1.2.0.jar). Unlike with 1.1.0, the
> distribution jar `flink-dist_2.10-1.2.0.jar` does not include the hadoop
> compatibility classes anymore. However, `TypeExtractor` which is part of
> the distribution jar tries to load `WritableTypeInfo` using it was
> loaded itself from:
>
> >   Class typeInfoClass;
> >   try {
> >   typeInfoClass = 
> > Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS,
> false, TypeExtractor.class.getClassLoader());
> >   }
> >   catch (ClassNotFoundException e) {
> >   throw new RuntimeException("Could not load the
> TypeInformation for the class '"
> >   + HADOOP_WRITABLE_CLASS + "'. You
> may be missing the 'flink-hadoop-compatibility' dependency.");
> >   }
>
> Adding `flink-hadoop-compatibility` to my application jar leads to the
> following stack trace on yarn (running `bin/flink run -m yarn-cluster...`):
>
> > Caused by: java.lang.RuntimeException: Could not load the
> TypeInformation for the class 'org.apache.hadoop.io.Writable'. You may be
> missing the 'flink-hadoop-compatibility' dependency.
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> createHadoopWritableTypeInfo(TypeExtractor.java:2025)
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> privateGetForClass(TypeExtractor.java:1649)
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> privateGetForClass(TypeExtractor.java:1591)
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> createSubTypesInfo(TypeExtractor.java:998)
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> privateCreateTypeInfo(TypeExtractor.java:629)
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> createTypeInfo(TypeExtractor.java:595)
> >   at org.apache.flink.api.java.typeutils.TypeExtractor.
> createTypeInfo(TypeExtractor.java:588)
> >   at org.apache.flink.api.common.typeinfo.TypeHint.(
> TypeHint.java:47)
> >   at cz.seznam.euphoria.benchmarks.flink.Util$2.(Util.java:80)
>
> I guess I'm supposed to customize my flink installation by adding the
> hadoop-compatibility jar to flink's `lib` dir, correct? If so, is this
> documented? I couldn't find any hints on [1] nor [2] and, thus, suppose
> this is maybe an unintentional change between 1.1 and 1.2.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/batch/hadoop_compatibility.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/migration.html
>
> P.
>
>
>


Flink + Druid example?

2017-04-07 Thread Matt
Hi all,

I'm looking for an example of Tranquility (Druid's lib) as a Flink sink.

I'm trying to follow the code in [1] but I feel it's incomplete or maybe
outdated, it doesn't mention anything about other method (tranquilizer)
that seems to be part of the BeamFactory interface in the current version.

If anyone has any code or a working project to use as a reference that
would be awesome for me and for the rest of us looking for a time-series
database solution!

Best regards,
Matt

[1] https://github.com/druid-io/tranquility/blob/master/docs/flink.md


Re: Hi

2017-04-07 Thread Fabian Hueske
Hi Wolfe,

that's all correct. Thank you!

I'd like to emphasize that the FsStateBackend stores all state on the heap
of the worker JVM. So you might run into OutOfMemoryErrors if you state
grows too large.
Therefore, the RocksDBStatebackend is the recommended choice for most
production use cases.

Best, Fabian

2017-04-07 16:34 GMT+02:00 Brian Wolfe :

> Hi Kant,
>
> Jumping in here, would love corrections if I'm wrong about any of this.
>
> In short answer, no, HDFS is not necessary to run stateful stream
> processing. In the minimal case, you can use the MemoryStateBackend to back
> up your state onto the JobManager.
>
> In any production scenario, you will want more durability for your
> checkpoints and larger state size. To do this, you should use either
> RocksDBStateBackend or FsStateBackend. Assuming you want one of these, you
> will need a checkpoint directory on a filesystem that is accessible by all
> TaskManagers. The filesystem for this checkpointing directory
> (state.backend.*.checkpointdir) can be a shared drive or anything
> supported by the Hadoop file backend see:
> *https://hadoop.apache.org/docs/stable/index.html
> *
> under Hadoop Compatible File Systems for other alternatives (S3, for
> example).
>
> Choosing RocksDBStateBackend vs. FsStateBackend is a different decision.
> FsStateBackend stores in-flight state in memory and writes it to your
> durable filesystem only when checkpoints are initiated. The
> RocksDBStateBackend stores in-flight data on local disk (in RocksDB)
> instead of in-memory. When checkpoints are initiated, the appropriate state
> is then written to the durable filesystem. Because it stores state on disk,
> RocksDBStateBackend can handle much larger state than FsStateBackend on
> equivalent hardware.
>
> I'm drawing most of this from this page:
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
> ops/state_backends.html
>
> Does that make sense?
>
> Cheers,
> Wolfe
>
> ~
> Brian Wolfe
>
>
> On Fri, Apr 7, 2017 at 2:32 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> I read the docs however I still have the following question For Stateful
>> stream processing is HDFS mandatory? because In some places I see it is
>> required and other places I see that rocksDB can be used. I just want to
>> know if HDFS is mandatory for Stateful stream processing?
>>
>> Thanks!
>>
>
>


Re: Hi

2017-04-07 Thread Brian Wolfe
Hi Kant,

Jumping in here, would love corrections if I'm wrong about any of this.

In short answer, no, HDFS is not necessary to run stateful stream
processing. In the minimal case, you can use the MemoryStateBackend to back
up your state onto the JobManager.

In any production scenario, you will want more durability for your
checkpoints and larger state size. To do this, you should use either
RocksDBStateBackend or FsStateBackend. Assuming you want one of these, you
will need a checkpoint directory on a filesystem that is accessible by all
TaskManagers. The filesystem for this checkpointing directory
(state.backend.*.checkpointdir) can be a shared drive or anything supported
by the Hadoop file backend see:
*https://hadoop.apache.org/docs/stable/index.html
*
under Hadoop Compatible File Systems for other alternatives (S3, for
example).

Choosing RocksDBStateBackend vs. FsStateBackend is a different decision.
FsStateBackend stores in-flight state in memory and writes it to your
durable filesystem only when checkpoints are initiated. The
RocksDBStateBackend stores in-flight data on local disk (in RocksDB)
instead of in-memory. When checkpoints are initiated, the appropriate state
is then written to the durable filesystem. Because it stores state on disk,
RocksDBStateBackend can handle much larger state than FsStateBackend on
equivalent hardware.

I'm drawing most of this from this page:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_
backends.html

Does that make sense?

Cheers,
Wolfe

~
Brian Wolfe


On Fri, Apr 7, 2017 at 2:32 AM, kant kodali  wrote:

> Hi All,
>
> I read the docs however I still have the following question For Stateful
> stream processing is HDFS mandatory? because In some places I see it is
> required and other places I see that rocksDB can be used. I just want to
> know if HDFS is mandatory for Stateful stream processing?
>
> Thanks!
>


hadoopcompatibility not in dist

2017-04-07 Thread Petr Novotnik
Hello,

with 1.2.0 `WritableTypeInfo` got moved into its own artifact
(flink-hadoop-compatibility_2.10-1.2.0.jar). Unlike with 1.1.0, the
distribution jar `flink-dist_2.10-1.2.0.jar` does not include the hadoop
compatibility classes anymore. However, `TypeExtractor` which is part of
the distribution jar tries to load `WritableTypeInfo` using it was
loaded itself from:

>   Class typeInfoClass;
>   try {
>   typeInfoClass = 
> Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, 
> TypeExtractor.class.getClassLoader());
>   }
>   catch (ClassNotFoundException e) {
>   throw new RuntimeException("Could not load the 
> TypeInformation for the class '"
>   + HADOOP_WRITABLE_CLASS + "'. You may 
> be missing the 'flink-hadoop-compatibility' dependency.");
>   }

Adding `flink-hadoop-compatibility` to my application jar leads to the
following stack trace on yarn (running `bin/flink run -m yarn-cluster...`):

> Caused by: java.lang.RuntimeException: Could not load the TypeInformation for 
> the class 'org.apache.hadoop.io.Writable'. You may be missing the 
> 'flink-hadoop-compatibility' dependency.
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:998)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588)
>   at 
> org.apache.flink.api.common.typeinfo.TypeHint.(TypeHint.java:47)
>   at cz.seznam.euphoria.benchmarks.flink.Util$2.(Util.java:80)

I guess I'm supposed to customize my flink installation by adding the
hadoop-compatibility jar to flink's `lib` dir, correct? If so, is this
documented? I couldn't find any hints on [1] nor [2] and, thus, suppose
this is maybe an unintentional change between 1.1 and 1.2.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/hadoop_compatibility.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/migration.html

P.




signature.asc
Description: OpenPGP digital signature


Hi

2017-04-07 Thread kant kodali
Hi All,

I read the docs however I still have the following question For Stateful
stream processing is HDFS mandatory? because In some places I see it is
required and other places I see that rocksDB can be used. I just want to
know if HDFS is mandatory for Stateful stream processing?

Thanks!


Disk I/O in Flink

2017-04-07 Thread Robert Schmidtke
Hi,

I'm currently examining the I/O patterns of Flink, and I'd like to know
when/how Flink goes to disk. Let me give an introduction of what I have
done so far.

I am running TeraGen (from the Hadoop examples package) + TeraSort (
https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each
node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of
disk. I'm using YARN and HDFS. The underlying file system is XFS.

Now before running TeraGen and TeraSort, I reset the XFS counters to zero,
and after TeraGen + TeraSort are finished, I dump the XFS counters again.
Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB of
reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1
for TeraSort) and 1 TiB of reads (during TeraSort).

Unsatisfied by the coarseness of these numbers I developed an HDFS wrapper
that logs file system statistics for each call to hdfs://..., such as start
time/end time, no. of bytes read/written etc. I can plot these numbers and
see what I expect: during TeraGen I have 1 TiB of writes to hdfs://...,
during TeraSort I have 1 TiB of reads from and 1 TiB of writes to
hdfs://... So far, so good.

Now this still did not explain the disk I/O, so I added bytecode
instrumentation to a range of Java classes, like FileIn/OutputStream,
RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory
mapped files etc., and have the same statistics: start/end of a read
from/write to disk, no. of bytes involved and such. I can plot these
numbers too and see that the HDFS JVMs write 1 TiB of data to disk during
TeraGen (expected) and read and write 1 TiB from and to disk during
TeraSort (expected).

Sorry for the enormous introduction, but now there's finally the
interesting part: Flink's JVMs read from and write to disk 1 TiB of data
each during TeraSort. I'm suspecting there is some sort of spilling
involved, potentially because I have not done the setup properly. But that
is not the crucial point: my statistics give a total of 3 TiB of writes to
disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters
from above. However, my statistics only give 2 TiB of reads from disk (1
TiB for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from
disk somewhere. I have done the same with Hadoop TeraSort, and there I'm
not missing any data, meaning my statistics agree with XFS for TeraSort on
Hadoop, which is why I suspect there are some cases where Flink goes to
disk without me noticing it.

Therefore here finally the question: in which cases does Flink go to disk,
and how does it do so (meaning precisely which Java classes are involved,
so I can check my bytecode instrumentation)? This would also include any
kind of resource distribution via HDFS/YARN I guess (like JAR files and I
don't know what). Seeing that I'm missing an amount of data equal to the
size of my input set I'd suspect there must be some sort of
shuffling/spilling at play here, but I'm not sure. Maybe there is also some
sort of remote I/O involved via sockets or so that I'm missing.

Any hints as to where Flink might incur disk I/O are greatly appreciated!
I'm also happy with doing the digging myself, once pointed to the proper
packages in the Apache Flink source tree (I have done my fair share of
inspection already, but could not be sure whether or not I have missed
something). Thanks a lot in advance!

Robert

-- 
My GPG Key ID: 336E2680


Re: Shared DataStream

2017-04-07 Thread nragon
Yeah, hoping you'd say that :)
Would be nice to share them just because the network overhead between flink
and kafka.

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Shared-DataStream-tp12545p12549.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Shared DataStream

2017-04-07 Thread Fabian Hueske
Flink does not provide any tooling to link two jobs together.
I would use Kafka to connect Flink jobs with each other.

Best, Fabian

2017-04-06 17:28 GMT+02:00 nragon :

> Hi,
>
> Can we share the end point of on job (datastream) with another job?
> The arquitecture I'm working on should provide abstraction and dynamism
> between processing jobs, which transform and load data into hbase or other
> sink, and cep jobs, which will be used to detect anomalies. But because the
> datastream of one processing job can be used to N cep jobs i wonder if
> sharing datastreams between jobs would be possible? Or if not, do I need to
> send them back to kafka.
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Shared-DataStream-tp12545.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Submit Flink job programatically

2017-04-07 Thread Kamil Dziublinski
Hey,

I had a similar problem when I tried to list the jobs and kill one by name
in yarn cluster. Initially I also tried to set YARN_CONF_DIR but it didn't
work.
What helped tho was passing hadoop conf dir to my application when starting
it. Like that:
java -cp application.jar:/etc/hadoop/conf

Reason was that my application was finding default configuration coming
from hadoop dependency in fat jar and was not even trying to look for
anything in environment variable.
When I passed hadoop conf dir to it, it started working properly.

Hope it helps,

Cheers,
Kamil.

On Fri, Apr 7, 2017 at 8:04 AM, Jins George  wrote:

> Hello Community,
>
> I have a need to submit  flink job to a remote Yarn cluster
> programatically . I tried to use YarnClusterDescriptor.deploy() , but I get
> message
> *RMProxy.java:92:main] - Connecting to ResourceManager at /0.0.0.0:8032
> . *It is trying to connect the resouce manager on
> the client machine.  I have set the YARN_CONF_DIR on the client machine
> and placed yarn-site.xml , core-site.xml etc.  However it does not seems to
> be picking these files.
>
> Is this the right way to sumit to a Remote Yarn cluster ?
>
>
> Thanks,
> Jins George
>


Submit Flink job programatically

2017-04-07 Thread Jins George

Hello Community,

I have a need to submit  flink job to a remote Yarn cluster 
programatically . I tried to use YarnClusterDescriptor.deploy() , but I 
get message /RMProxy.java:92:main] - Connecting to ResourceManager at 
/0.0.0.0:8032.
/It is trying to connect the resouce manager on the client machine.  I 
have set the YARN_CONF_DIR on the client machine  and placed 
yarn-site.xml , core-site.xml etc.  However it does not seems to be 
picking these files.


Is this the right way to sumit to a Remote Yarn cluster ?


Thanks,
Jins George