Re: Elasticsearch 5.x connection

2017-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

java.lang.NoSuchMethodError: 
org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.(ElasticsearchSinkBase.java:195)
        at 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(ElasticsearchSink.java:95)
        at 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(
The reason for this exception is because the `isSerializable `method only 
exists in 1.3-SNAPSHOT of `flink-core` at the moment. These kind of errors can 
usually be expected to happen if you are using mismatching versions of Flink 
libraries and core Flink dependencies.

Elasticsearch 5 will be released with Flink 1.3.0 (targeted release time is end 
of May). For the time being, if Elasticsearch 5 is a must, you could try 
implementing a copy of the `isSerializable` method under the exact same package 
path / method and class name in your own project. However, I can not guarantee 
that this will work as there may be other conflicts.

- Gordon



On March 2, 2017 at 10:47:52 PM, Fábio Dias (fabiodio...@gmail.com) wrote:

java.lang.NoSuchMethodError: 
org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.(ElasticsearchSinkBase.java:195)
        at 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(ElasticsearchSink.java:95)
        at 
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.(

Re: Flink, Yarn and MapR Kerberos issue

2017-03-01 Thread Tzu-Li (Gordon) Tai
Hi Aniket,

Thanks a lot for reporting this.

I’m afraid this seems to be a bug with Flink on YARN’s Kerberos authentication. 
It is incorrectly checking for Kerberos credentials even for non-Kerberos 
authentication methods.
I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5949.

For the time being, I don’t think there’s a simple way to workaround it before 
the bug is fixed, because the bug indicates that whatever security type is 
enabled, Kerberos is used. We should probably have this fixed soon in the next 
bug fix release for Flink 1.2.

- Gordon


On March 2, 2017 at 7:11:02 AM, ani.desh1512 (ani.desh1...@gmail.com) wrote:

I am trying to setup Flink 1.2 using yarn on MapR (v5.2.0). The MapR cluster,  
on which, I am trying to setup this is a secure cluster. But, this cluster  
does not use Kerberos. Mapr, by default, uses some variant of ssl  
  
and MapR also normally has its own JAAS .conf file, which it relies on.  

When I try to run yarn-session.sh, I get the following error:  

/java.lang.RuntimeException: Hadoop security is enabled but the login user  
does not have Kerberos credentials/  
To resolve this I tried the following two things:  

1. I had seen a somewhat similar mention of this issue on JIRA  
 . The issue says that  
its resolved in 1.2 but the comments on that issue do not indicate that.  
By the way, I have added  
"-Djava.security.auth.login.config=/opt/mapr/conf/mapr.login.conf" in the  
yarn-session.sh file. But I still the get the same issue.  

So, is this issue resolved? What am I missing here? Why does Flink require  
Kerberos credentials when MapR has no Kerberos setup?  

2. I also tried specifying following in flink-conf.yaml:  
security.ssl.enabled: true  
security.ssl.keystore: /opt/mapr/conf/ssl_keystore  
security.ssl.keystore-password: <>  
security.ssl.key-password: <>  
security.ssl.truststore: /opt/mapr/conf/ssl_truststore  
security.ssl.truststore-password: <>  

But, this too did not solve the problem and I get the same issue. Why is  
Flink trying to get Kerberos credentials even after ssl security is enabled?  

Thanks,  
Aniket  





--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-tp11996.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: unclear exception when writing to elasticsearch

2017-03-01 Thread Tzu-Li (Gordon) Tai
Hi Martin,

I followed your setup:

1. Maven java quick start archetype (Flink version 1.1.3)
2. Added `flink-connector-elasticsearch2_2.10` version 1.1.3 dependency
3. Ran the example in the Flink Elasticsearch docs against a Elasticsearch 
2.4.1 installation

and everything worked fine.

Just to make sure nothing is conflicting, you could also try to do a `mvn 
dependency:purge-local-repository` on your project, and then re-download the 
dependencies with `mvn clean install`, and finally re-importing your project in 
the IDE.

Let me know if this works for you!

Cheers,
Gordon


On March 1, 2017 at 9:23:35 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

Hi Martin,

Just letting you know I’m trying your setup right now, and will get back to you 
once I confirm the results.

- Gordon


On March 1, 2017 at 9:15:16 PM, Martin Neumann (mneum...@sics.se) wrote:

I created the project using the maven archetype so I'm using the packaged 
version pulled by maven. 

At this point, I just try to run it directly from inside the IDE (IntelliJ), 
mostly since I don't want to build it and deploy it on the cluster all the 
time. I tried building it (maven 3.0.5), it builds fine but fails to run on the 
cluster with the same exception that I get if I run things from within the IDE. 

My guess is that maybe some function names have changed between elastic search 
versions and they are just not compatible anymore.

In the Worst case, I will hack something together that just writes the data 
using HttpURLConnection pushing things to the rest interface. (If that works 
from within flink)


cheers Martin

On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier  
wrote:
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.
From 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html#dependency-shading:

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install 
-DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, 
then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann  wrote:
I tried to change the elastic search version to 2.4.1 which results in a new 
exception:

Caused by: java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:192)
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai  wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your 
desired version in your project.

You can also check what Elasticsearch client version the project is using by 
checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging 
problem. That said I added the plugin from the link provided but I'm not sure 
what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently 
using is the flink-connector do I have to modify its code?


org.apache.flink
flink-connector-elasticsearch2_2.10
1.1.3


One thing I forgot to mention, I can only modify things locally packing it into 
a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running 
things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-you

Re: unclear exception when writing to elasticsearch

2017-03-01 Thread Tzu-Li (Gordon) Tai
Hi Martin,

Just letting you know I’m trying your setup right now, and will get back to you 
once I confirm the results.

- Gordon


On March 1, 2017 at 9:15:16 PM, Martin Neumann (mneum...@sics.se) wrote:

I created the project using the maven archetype so I'm using the packaged 
version pulled by maven. 

At this point, I just try to run it directly from inside the IDE (IntelliJ), 
mostly since I don't want to build it and deploy it on the cluster all the 
time. I tried building it (maven 3.0.5), it builds fine but fails to run on the 
cluster with the same exception that I get if I run things from within the IDE. 

My guess is that maybe some function names have changed between elastic search 
versions and they are just not compatible anymore.

In the Worst case, I will hack something together that just writes the data 
using HttpURLConnection pushing things to the rest interface. (If that works 
from within flink)


cheers Martin

On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier  
wrote:
Did you build Flink from sources or are you using the packeged version? 
Because I had an annoying problem when compiling Flink with maven > 3.3.
From 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html#dependency-shading:

Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install 
-DskipTests in the root directory of Flink code base.

Maven 3.3.x The build has to be done in two steps: First in the base directory, 
then in the distribution project:

mvn clean install -DskipTests
cd flink-dist
mvn clean install
Note: To check your Maven version, run mvn --version. 

On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann  wrote:
I tried to change the elastic search version to 2.4.1 which results in a new 
exception:

Caused by: java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:192)
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)


On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai  wrote:
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your 
desired version in your project.

You can also check what Elasticsearch client version the project is using by 
checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon


On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging 
problem. That said I added the plugin from the link provided but I'm not sure 
what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently 
using is the flink-connector do I have to modify its code?


org.apache.flink
flink-connector-elasticsearch2_2.10
1.1.3


One thing I forgot to mention, I can only modify things locally packing it into 
a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running 
things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-your-usercode-with-maven.

Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a 
weird error message I that I can't decipher. Hopefully, someone here can help 
me. I'm trying to run the java example from the website.I doublechecked that I 
can reach the elastic search from the development machine by putting some data 
in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{

"n

Re: unclear exception when writing to elasticsearch

2017-02-28 Thread Tzu-Li (Gordon) Tai
Hi Martin,

You can do that by adding a dependency to the Elasticsearch client of your 
desired version in your project.

You can also check what Elasticsearch client version the project is using by 
checking `mvn dependency:tree` from the base directory of your project.

Cheers,
Gordon

On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a packaging 
problem. That said I added the plugin from the link provided but I'm not sure 
what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm currently 
using is the flink-connector do I have to modify its code?


org.apache.flink
flink-connector-elasticsearch2_2.10
1.1.3


One thing I forgot to mention, I can only modify things locally packing it into 
a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running 
things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-your-usercode-with-maven.

Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a 
weird error message I that I can't decipher. Hopefully, someone here can help 
me. I'm trying to run the java example from the website.I doublechecked that I 
can reach the elastic search from the development machine by putting some data 
in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{

"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be 
buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 
19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.elasticsearch.threadpool.ThreadPool
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)



Re: ElasticsearchSink Exception

2017-02-28 Thread Tzu-Li (Gordon) Tai
Good to know it’s working! Thanks for the update :-)


On March 1, 2017 at 6:03:44 AM, Govindarajan Srinivasaraghavan 
(govindragh...@gmail.com) wrote:

Hi Gordon/Flavio,

Found out the issue was because of elastic search version mismatch. Another 
person upgraded ES version to 5.x but I was using 2.x. After changing the 
version it worked. Thanks for the help.

On Mon, Feb 27, 2017 at 6:12 AM, Tzu-Li (Gordon) Tai  
wrote:
Hi!

Like wha Flavio suggested, at a first glance this looks like a problem with 
building the uber jar.

I haven’t bumped into the problem while testing out the connector on cluster 
submitted test jobs before, but I can try to test this quickly to make sure.

Could you tell me what your installed Elasticsearch version is? Also, how are 
you building your uber jar?

Cheers,
Gordon


On February 27, 2017 at 9:40:02 PM, Aljoscha Krettek (aljos...@apache.org) 
wrote:

+Tzu-Li (Gordon) Tai Do you have any idea what could be causing this? I'm 
asking because you recently worked on the Elasticsearch connectors, right?

On Sun, 26 Feb 2017 at 04:26 Govindarajan Srinivasaraghavan 
 wrote:
Thanks Flavio. I tried with multiple versions but still the same exception
and I was able to locate the class file inside my jar. Am I missing
something? Thanks for all the help.

On Sat, Feb 25, 2017 at 3:09 PM, Flavio Pompermaier 
wrote:

> The exception you have (NoClassDefFoundError:
> org/elasticsearch/index/mapper/MapperParsingException) is usually caused
> by
> elasticsearch version conflict or a bad shading when creating the uber jar.
> Can you check if one of the 2 is causing the problem?
>
> On 25 Feb 2017 23:13, "Govindarajan Srinivasaraghavan" <
> govindragh...@gmail.com> wrote:
>
> > Hi Flavio,
> >
> > I tried with both http port 9200 and tcp port 9300 and I see incoming
> > connections in the elasticserach node. Also I see the below errors in
> > taskmanager out logs. Below are the dependencies I have on my gradle
> > project. Am I missing something?
> >
> > Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
> > java.lang.NoClassDefFoundError:
> > org/elasticsearch/index/mapper/MapperParsingException
> >         at
> > org.elasticsearch.ElasticsearchException.(
> > ElasticsearchException.java:597)
> >         at
> > org.elasticsearch.transport.TransportService$Adapter$3.
> > run(TransportService.java:622)
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >         at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.ClassNotFoundException:
> > org.elasticsearch.index.mapper.MapperParsingException
> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >         ... 5 more
> >
> >
> > Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
> > java.lang.NoClassDefFoundError: Could not initialize class
> > org.elasticsearch.transport.NodeDisconnectedException
> >         at
> > org.elasticsearch.transport.TransportService$Adapter$3.
> > run(TransportService.java:622)
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >         at java.lang.Thread.run(Thread.java:745)
> >
> >
> > compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
> > version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
> > compile group: 'org.apache.flink', name:
> > 'flink-connector-kafka-0.10_2.10', version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-clients_2.10', version:
> > '1.2.0'
> >
> > compile group: 'org.apache.flink', name:
> > 'flink-connector-elasticsearch2_2.10', version: '1.2.0'
> >
> >
> > On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <
> pomperma...@okkam.it>
> > wrote:
> >
> > > Are you sure that in elasticsearch.yml you've enabled ES to listen to
> the
> > > http port 9300?
> >

Re: unclear exception when writing to elasticsearch

2017-02-28 Thread Tzu-Li (Gordon) Tai
Hi!

This could be a Elasticsearch server / client version conflict, or that the 
uber jar of your code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to 
build the Flink Elasticsearch Connector. Could you try overriding this version 
to 2.4.1 when building your code and see if the problem remains?

For the second issue, please check out 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-your-usercode-with-maven.

Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a 
weird error message I that I can't decipher. Hopefully, someone here can help 
me. I'm trying to run the java example from the website.I doublechecked that I 
can reach the elastic search from the development machine by putting some data 
in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{

"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be 
buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 
19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.elasticsearch.threadpool.ThreadPool
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)

Re: ElasticsearchSink Exception

2017-02-27 Thread Tzu-Li (Gordon) Tai
Hi!

Like wha Flavio suggested, at a first glance this looks like a problem with 
building the uber jar.

I haven’t bumped into the problem while testing out the connector on cluster 
submitted test jobs before, but I can try to test this quickly to make sure.

Could you tell me what your installed Elasticsearch version is? Also, how are 
you building your uber jar?

Cheers,
Gordon


On February 27, 2017 at 9:40:02 PM, Aljoscha Krettek (aljos...@apache.org) 
wrote:

+Tzu-Li (Gordon) Tai Do you have any idea what could be causing this? I'm 
asking because you recently worked on the Elasticsearch connectors, right?

On Sun, 26 Feb 2017 at 04:26 Govindarajan Srinivasaraghavan 
 wrote:
Thanks Flavio. I tried with multiple versions but still the same exception
and I was able to locate the class file inside my jar. Am I missing
something? Thanks for all the help.

On Sat, Feb 25, 2017 at 3:09 PM, Flavio Pompermaier 
wrote:

> The exception you have (NoClassDefFoundError:
> org/elasticsearch/index/mapper/MapperParsingException) is usually caused
> by
> elasticsearch version conflict or a bad shading when creating the uber jar.
> Can you check if one of the 2 is causing the problem?
>
> On 25 Feb 2017 23:13, "Govindarajan Srinivasaraghavan" <
> govindragh...@gmail.com> wrote:
>
> > Hi Flavio,
> >
> > I tried with both http port 9200 and tcp port 9300 and I see incoming
> > connections in the elasticserach node. Also I see the below errors in
> > taskmanager out logs. Below are the dependencies I have on my gradle
> > project. Am I missing something?
> >
> > Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
> > java.lang.NoClassDefFoundError:
> > org/elasticsearch/index/mapper/MapperParsingException
> >         at
> > org.elasticsearch.ElasticsearchException.(
> > ElasticsearchException.java:597)
> >         at
> > org.elasticsearch.transport.TransportService$Adapter$3.
> > run(TransportService.java:622)
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >         at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.ClassNotFoundException:
> > org.elasticsearch.index.mapper.MapperParsingException
> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >         ... 5 more
> >
> >
> > Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
> > java.lang.NoClassDefFoundError: Could not initialize class
> > org.elasticsearch.transport.NodeDisconnectedException
> >         at
> > org.elasticsearch.transport.TransportService$Adapter$3.
> > run(TransportService.java:622)
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >         at java.lang.Thread.run(Thread.java:745)
> >
> >
> > compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
> > version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
> > compile group: 'org.apache.flink', name:
> > 'flink-connector-kafka-0.10_2.10', version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-clients_2.10', version:
> > '1.2.0'
> >
> > compile group: 'org.apache.flink', name:
> > 'flink-connector-elasticsearch2_2.10', version: '1.2.0'
> >
> >
> > On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <
> pomperma...@okkam.it>
> > wrote:
> >
> > > Are you sure that in elasticsearch.yml you've enabled ES to listen to
> the
> > > http port 9300?
> > >
> > > On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
> > > govindragh...@gmail.com> wrote:
> > >
> > > Hi All,
> > >
> > > I'm getting the below exception when I start my flink job. I have
> > verified
> > > the elastic search host and it seems to be working well. I have also
> > tried
> > > including the below depen

Re: Fw: Flink Kinesis Connector

2017-02-27 Thread Tzu-Li (Gordon) Tai
Hi Matt!

As mentioned in the docs, due to the ASL license, we do not deploy the artifact 
to the Maven central repository on Flink releases.
You will need to build the Kinesis connector by yourself (the instructions to 
do so are also in the Flink Kinesis connector docs :)), and install it to your 
local Maven repository.
Then, you’ll be able to add it as a Maven dependency in your projects.

Cheers,
Gordon


On February 27, 2017 at 8:10:52 PM, Matt (mattmcgowan1...@hotmail.com) wrote:

Hi,


I'm working through trying to connect flink up to a kinesis stream, off of 
this: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kinesis.html

Apache Flink 1.2.0 Documentation: Amazon AWS Kinesis ...
ci.apache.org
The flink-connector-kinesis_2.10 has a dependency on code licensed under the 
Amazon Software License (ASL). Linking to the flink-connector-kinesis will 
include ASL ...
It gives the following Maven dependency:


  org.apache.flink
  flink-connector-kinesis_2.10
  1.2.0


However, I'm struggling to find that. It doesn't appear to be up on Maven, nor 
is it listed in the Apache repository.

Does this still exist? Am I missing something obvious?



Thanks in advance for any help,



Matt


Re: Flink the right tool for the job ? Huge Data window lateness

2017-02-24 Thread Tzu-Li (Gordon) Tai
Hi Patrick,

Thanks a lot for feedback on your use case! At a first glance, I would say that 
Flink can definitely solve the issues you are evaluating.

I’ll try to explain them, and point you to some docs / articles that can 
further explain in detail:

- Lateness

The 7-day lateness shouldn’t be a problem. We definitely recommend
using RocksDB as the state backend for such a use case, as you
mentioned correctly, the state would be kept for a long time.
The heavy burst when your locally buffered data on machines are
sent to Kafka once they come back online shouldn’t be a problem either;
since Flink is a pure data streaming engine, it handles backpressure
naturally without any additional mechanisms (I would recommend
taking a look at http://data-artisans.com/how-flink-handles-backpressure/).

- Out of Order

That’s exactly what event time processing is for :-) As long as the event
comes in before the allowed lateness for windows, the event will still fall
into its corresponding event time window. So, even with the heavy burst of
the your late machine data, they will still be aggregated in the correct 
windows.
You can look into event time in Flink with more detail in the event time docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html

- Last write wins

Your operators that does the aggregations simply need to be able to reprocess
results if it sees an event with the same id come in. Now, if results are sent 
out
of Flink and stored in an external db, if you can design the db writes to be 
idempotent,
then it’ll effectively be a “last write wins”. It depends mostly on your 
pipeline and
use case.
- Computations per minute

I think you can simply do this by having two separate window operators.
One that works on your longer window, and another on a per-minute basis.
Hope this helps!

- Gordon


On February 24, 2017 at 10:49:14 PM, Patrick Brunmayr (j...@kpibench.com) wrote:

Hello

I've done my first steps with Flink and i am very impressed of its 
capabilities. Thank you for that :) I want to use it for a project we are 
currently working on. After reading some documentation
i am not sure if it's the right tool for the job. We have an IoT application in 
which we are monitoring machines in production plants. The machines have 
sensors attached and they are sending
their data to a broker ( Kafka, Azure Iot Hub ) currently on a per minute basis.

Following requirements must be fulfilled

Lateness

We have to allow lateness for 7 days because machines can have down time due 
network issues, maintenance or something else. If thats the case buffering of 
data happens localy on the machine and once they
are online again all data will be sent to the broker. This can result in some 
relly heavy burst.


Out of order

Events come out of order due this lateness issues


Last write wins

Machines are not stateful and can not guarantee exactly once sending of their 
data. It can happen that sometimes events are sent twice. In that case the last 
event wins and should override the previous one.
Events are unique due a sensor_id and a timestamp

Computations per minute

We can not wait until the windows ends and have to do computations on a per 
minute basis. For example aggregating data per sensor and writing it to a db

My biggest concern in that case is the huge lateness. Keeping data for 7 days 
would result in 10080 data points for just one sensor! Multiplying that by 
10.000 sensors would result in 10080 datapoints which Flink
would have to handle in its state. The number of sensors are constantly growing 
so will the number of data points

So my questions are

Is Flink the right tool for the Job ?

Is that lateness an issue ?

How can i implement the Last write wins ?

How to tune flink to handle that growing load of sensors and data points ?

Hardware requirements, storage and memory size ?


I don't want to maintain two code base for batch and streaming because the 
operations are all equal. The only difference is the time range! Thats the 
reason i wanted to do all this with Flink Streaming.

Hope you can guide me in the right direction

Thx









Re: Flink dependencies shading

2017-02-24 Thread Tzu-Li (Gordon) Tai
Hi Dmitry!

I know its been a while since this was reported, but I was recently looking at 
this issue and it seems like flink-mesos actually doesn’t contain the 
httpclient dependency, so that shouldn’t be the problem.

Could it be that you’ve built Flink incorrectly?
Note the info about the Maven versions in the Building Flink from Source docs: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/building.html#dependency-shading.
If you’re using Maven version 3.3.x+ to build Flink, you’ll need to first build 
in the base directory, and then in `flink-dist` again for the dependencies to 
be shaded properly.

- Gordon


On January 27, 2017 at 12:24:35 AM, Dmitry Golubets (dgolub...@gmail.com) wrote:

Hi Robert,

I ended up overriding Flink httpclient version number in main pom file and 
recompiling it.

Thanks


Best regards,
Dmitry

On Thu, Jan 26, 2017 at 4:12 PM, Robert Metzger  wrote:
Hi Dmitry,

I think this issue is new. 
Where is the AWS SDK dependency coming from? Maybe you can resolve the issue on 
your side for now.
I've filed a JIRA for this issue: 
https://issues.apache.org/jira/browse/FLINK-5661



On Wed, Jan 25, 2017 at 8:24 PM, Dmitry Golubets  wrote:
I've build latest Flink from sources and it seems that httpclient dependency 
from flink-mesos is not shaded. It causes troubles with latest AWS SDK.
Do I build it wrong or is it a known problem?

Best regards,
Dmitry




Re: About the blob client and blob server authentication

2017-02-23 Thread Tzu-Li (Gordon) Tai
Hi!

https://github.com/apache/flink/pull/2425 is one my list of “to reviews”.
However, at the moment I can not guarantee whether it’ll make it in time into 
the 1.3 release.

If you would like to, please feel free to review and comment on the pull 
request also!
We always appreciate extra pairs of eyes on bigger new features like this one 
:-)

Cheers,
Gordon


On February 23, 2017 at 3:57:37 PM, Zhangrucong (zhangruc...@huawei.com) wrote:

Hi:

I find the the flink issue 2425. https://github.com/apache/flink/pull/2425

 

This issue will do the authentication by using security cookie between the blob 
client and blob server!

 

In my opinion, to use the SASL digest-md5 is much more authority. what do you 
think?

 

BTW, when this issue is merged to master? Thanks in advance!

Re: Serialization schema

2017-02-23 Thread Tzu-Li (Gordon) Tai
Thanks for clarifying. 

From the looks of your exception:

Caused by: java.io.NotSerializableException: 
com.sy.flink.test.Tuple2Serializerr$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous inner 
class in `Tuple2Serializerr` is not serializable.

Could you check if that’s the case?


On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

But it is not an inner class.

On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai  
wrote:
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve 
the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit 
reference to the enclosing outer class, and therefore serializing it will 
result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2 tuple = new Tuple2<>(Integer.valueOf(sarr[0]), 
Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` 
contains fields that are not serializable, so `Tuple2Serializer` itself is not 
serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we 
can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to 
provide the whole `serialize` / `deserialize` implementation if you don’t want 
to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪  wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is 
some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia :
I wrote a key serialization class to write to kafka however I am getting this 
error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: 
com.sy.flink.test.Tuple2Serializerr$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema>,

SerializationSchema> {

And called like this:



FlinkKafkaProducer010> myProducer = new 
FlinkKafkaProducer010>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema











Re: Serialization schema

2017-02-23 Thread Tzu-Li (Gordon) Tai
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve 
the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit 
reference to the enclosing outer class, and therefore serializing it will 
result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2 tuple = new Tuple2<>(Integer.valueOf(sarr[0]), 
Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` 
contains fields that are not serializable, so `Tuple2Serializer` itself is not 
serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we 
can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to 
provide the whole `serialize` / `deserialize` implementation if you don’t want 
to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪  wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is 
some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia :
I wrote a key serialization class to write to kafka however I am getting this 
error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: 
com.sy.flink.test.Tuple2Serializerr$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema>,

SerializationSchema> {

And called like this:



FlinkKafkaProducer010> myProducer = new 
FlinkKafkaProducer010>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema










Re: Writing Tuple2 to a sink

2017-02-23 Thread Tzu-Li (Gordon) Tai
Hi Mohit,

I don’t completely understand your question, but I’m assuming that you know the 
type of records your custom sink will be receiving, but you don’t know how to 
extract values from the records.

Assume that the type of the incoming records will be `Tuple2`. 
When writing your custom sink, you should define that type by:

```
public class YourCustomSink implements SinkFunction> {
    …
    
    public void invoke(Tuple2 next) {
        // use next.f0 / next.f1 to retrieve values from the tuple
    }

    ...
}
```

You can of course also define generic types to replace `String` and `Integer`, 
like so:

```
public class YourCustomSink implements SinkFunction> {
    …
    
    public void invoke(Tuple2 next) {
        F field1 = next.f0;
        S field2 = next.f1;
        ...
    }

    ...
}
```

Just replace the generic types with concrete types when instantiating your 
custom sink, according to your topology.

Let me know if this answers your question!

Cheers,
Gordon

On February 24, 2017 at 10:42:33 AM, 刘彪 (mmyy1...@gmail.com) wrote:

Currently, OutputFormat is used for DataSet, SinkFunction is used for 
DataStream. Maybe I misunderstand your problem. That will be better if you give 
more details.

2017-02-24 5:21 GMT+08:00 Mohit Anchlia :
This works for Kafka but for the other types of sink am I supposed to use some 
type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪  wrote:
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with this 
situation. There is a KeyedSerializationSchema user have to implement.   
KeyedSerializationSchema will be used to serialize data, so that SinkFunction 
just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in 
SinkFunction. And user have to implement the SerializationSchema, maybe named 
Tuple2SerializationSchema. 

2017-02-22 7:17 GMT+08:00 Mohit Anchlia :
What's the best way to retrieve both the values in Tuple2 inside a custom sink 
given that the type is not known inside the sink function?





Re: Serialization schema

2017-02-23 Thread Tzu-Li (Gordon) Tai
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` 
contains fields that are not serializable, so `Tuple2Serializer` itself is not 
serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we 
can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to 
provide the whole `serialize` / `deserialize` implementation if you don’t want 
to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪  wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is 
some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia :
I wrote a key serialization class to write to kafka however I am getting this 
error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: 
com.sy.flink.test.Tuple2Serializerr$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema>,

SerializationSchema> {

And called like this:



FlinkKafkaProducer010> myProducer = new 
FlinkKafkaProducer010>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema









Re: Apache Flink and Elasticsearch send Json Object instead of string

2017-02-22 Thread Tzu-Li (Gordon) Tai
Hi,

The Flink Elasticsearch Sink uses the Elasticsearch Java client to send the 
indexing requests, so whatever the client supports, it will be achievable 
through the `ElasticsearchSinkFunction` also.

From a quick check at the Elasticsearch Javadocs, I think you can also just set 
the document json as a String in the created `IndexRequest`. So,

public IndexRequest createIndexRequest(String element){

                    HashMap esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }

Here, if `element` is already a Json string representing the document, you can 
just do 

return Requests
    .indexRequest()
    .index(“logs”)
    .type(“object”)
    .source(“the Json String”);

The `.source(…)` method has quite a few variants on how to set the source, and 
providing a Map is only one of them.
Please refer to the Elasticsearch Javadocs for the full list 
(https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/5.2.1).

Hope this helps!

Cheers,
Gordon
On February 21, 2017 at 5:43:36 PM, Fábio Dias (fabiodio...@gmail.com) wrote:

Hi, 
thanks for the reply.

There isn't other way to do that?
Using REST you can send json like this : 

curl -XPOST 'localhost:9200/customer/external?pretty&pretty' -H 'Content-Type: 
application/json' -d'
{
 "name": "Jane Doe"
}
'

In my case I have json like this: 

{
      "filters" : {
                        "id" : 1,
                        "name": "abc"
                    }
}

how can I treat this cases? There isn't a way to send all the json element and 
index it like the in the REST request?

Thanks.

Tzu-Li (Gordon) Tai  escreveu no dia terça, 21/02/2017 às 
07:54:
Hi,

I’ll use your code to explain.

public IndexRequest createIndexRequest(String element){

                    HashMap esJson = new HashMap<>();
                    
                    esJson.put("data", element);
What you should do here is parse the field values from `element`, and simply 
treat them as key-value pairs of the `esJson` map.

So, the `esJson` should be prepared by doing:

esJson.put(“id”, 6);

esJson.put(“name”, “A green door”);

esJson.put(“price”, 12.5);

etc.



Cheers,

Gordon



On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodio...@gmail.com) wrote:

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json 
object ({"id":1, "name":"X"} ect...), I already have a string with this 
information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green 
door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction indexLog = new 
ElasticsearchSinkFunction() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink esSink = new 
ElasticsearchSink(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key 
value?

Thanks.

Re: Apache Flink and Elasticsearch send Json Object instead of string

2017-02-20 Thread Tzu-Li (Gordon) Tai
Hi,

I’ll use your code to explain.

public IndexRequest createIndexRequest(String element){

                    HashMap esJson = new HashMap<>();
                    
                    esJson.put("data", element);
What you should do here is parse the field values from `element`, and simply 
treat them as key-value pairs of the `esJson` map.

So, the `esJson` should be prepared by doing:

esJson.put(“id”, 6);

esJson.put(“name”, “A green door”);

esJson.put(“price”, 12.5);

etc.



Cheers,

Gordon



On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodio...@gmail.com) wrote:

Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json 
object ({"id":1, "name":"X"} ect...), I already have a string with this 
information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green 
door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction indexLog = new 
ElasticsearchSinkFunction() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap esJson = new HashMap<>();
                    
                    esJson.put("data", element);
                    
                    
                    
                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink esSink = new 
ElasticsearchSink(config, transports, indexLog);
            input.addSink(esSink);
        } 
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key 
value?

Thanks.

Re: kinesis producer setCustomPartitioner use stream's own data

2017-02-20 Thread Tzu-Li (Gordon) Tai
Hi Sathi,

The `getPartitionId` method is invoked with each record from the stream. In 
there, you can extract values / fields from the record, and use that to 
determine the target partition id.

Is this what you had in mind?

Cheers,
Gordon

On February 21, 2017 at 11:54:21 AM, Sathi Chowdhury 
(sathi.chowdh...@elliemae.com) wrote:

Hi flink users and experts,

 

In my flink processor I am trying to use Flink Kinesis connector . I read from 
a kinesis stream , and After the transformation (for which I use 
RichCoFlatMapFunction), json event needs to sink to a kinesis stream k1.

DataStream myStream = see.addSource(new 
FlinkKinesisConsumer<>(inputStream, new MyDeserializationSchema(), 
consumerConfig));
 

 

For setting up the producer including partitioning I want to use 
setCustompartitioner , but the problem is that I don’t know how to access a 
parameters inside myStream , I have multiple fields that I want to extract from 
the stream  right there in the main method and use them in deciding the 
partition key. is possible to choose a partition key that is prepared from the 
stream ? if so can you please share an example.

 

 


kinesis.setCustomPartitioner(new KinesisPartitioner() {
    @Override
    public String getPartitionId(String element) {
    int l = element.length();   /// here I want to bring values extracted 
from the stream
    return element.substring(l - 1, l);
    }
});

 

Thanks

Sathi

=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =

Re: Streaming data from MongoDB using Flink

2017-02-17 Thread Tzu-Li (Gordon) Tai
Sorry, I just realized I didn’t notice the second part question of your last 
email when replying.
Thanks Till for answering it!


On February 17, 2017 at 6:05:58 PM, Till Rohrmann (trohrm...@apache.org) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to 
environments that are already executing? In what I am currently developing, I 
need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,




Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-16 Thread Tzu-Li (Gordon) Tai
Hi Geoffrey,

Thanks for investigating and updating on this. Good to know that it is working!

Just to clarify, was your series of jobs submitted to a “yarn session + regular 
bin/flink run”, or “per job yarn cluster”?
I’m asking just to make sure of the limitations Robert mentioned.

Cheers,
Gordon


On February 17, 2017 at 3:37:27 AM, Geoffrey Mon (geof...@gmail.com) wrote:

Hi Robert,

Thanks for your reply. I've done some further testing and (hopefully) solved 
the issue; this turned out to be a red herring.  After discovering that the 
same issue manifested itself when testing on my local machine, I found that 
multiple jobs can be submitted from a main() function for both temporary and 
permanent Flink YARN clusters, and that the issue was not with Flink or with 
YARN, but with my job file.

In one part of my job, I need to fill in missing components of a vector with 
zeroes. I did this by combining the vector DataSet with another DataSet 
containing indexed zeroes using a union operation and an aggregation operation. 
In my problematic job, I used ExecutionEnvironment#fromElements to make a 
DataSet out of an ArrayList of Tuples containing an index and a zero. However, 
for input files with very large parameters, I needed to generate very large 
length DataSets of zeroes, and since I was using fromElements, the client 
needed to send the Flink runtime all of the elements with which to create the 
DataSet (lots and lots of zeroes). This caused the job to time out before 
execution, making me think that the job had not been properly received by the 
runtime.

I've replaced this with ExecutionEnvironment#generateSequence and a map 
function mapping each number of the generated sequence to a tuple with a zero. 
This has solved the issue and my job seems to be running fine for now.
(https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370)

Again, thank you very much for your help.

Sincerely,
Geoffrey

On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger  wrote:
Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one 
main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work. 

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon  wrote:
Just to clarify, is Flink designed to allow submitting multiple jobs from a 
single program class when using a YARN cluster? I wasn't sure based on the 
documentation.

Cheers,
Geoffrey


On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon  wrote:
Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job can be 
found here if it would help in any way: 
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by the 
previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it completes, 
the second job is submitted by the YARN client:


02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED 
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient           
            - Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient           
            - TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient           
            - All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient           
            - Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. 
Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://flink@.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works for 
me), then the second job runs fine. However, if the input file for my first job 
is large and the first job takes more than a minute or so to complete, Flink 
will not acknowledge receiving the next job; the web Flink console does not 
show any new jobs and Flink logs do not mention receiving any new jobs after 
the first job has completed. The YARN client's job submission times out after 
Flink does not respond:

Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
at 
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at 
org.apache.flink.runtime.client.JobClientActor.handleMessag

Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Tzu-Li (Gordon) Tai
Good to know!


On February 16, 2017 at 10:13:28 PM, Pedro Monteiro 
(pedro.mlmonte...@gmail.com) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to 
environments that are already executing? In what I am currently developing, I 
need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 11:29, Pedro Monteiro  wrote:
Thank you again for your prompt response.

I will give it a try and will come back to you.

Pedro Lima Monteiro

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai  wrote:
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides 
additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the 
cursor. In the `run()` method, you should essentially have a while loop that 
polls the MongoDB cursor and emits the fetched documents using the 
`SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s 
checkpointing for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my 
StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction() {
            ​​ @Override
            public void run(SourceFunction.SourceContext ctx) throws 
Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access 
my MongoDB's cursor in any of this methods (I suppose the most adequate would 
be the "run" method) in a way it would allow me to return a new MongoDB 
document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai  wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that 
uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro





Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Tzu-Li (Gordon) Tai
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides 
additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the 
cursor. In the `run()` method, you should essentially have a while loop that 
polls the MongoDB cursor and emits the fetched documents using the 
`SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s 
checkpointing for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my 
StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction() {
            ​​ @Override
            public void run(SourceFunction.SourceContext ctx) throws 
Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access 
my MongoDB's cursor in any of this methods (I suppose the most adequate would 
be the "run" method) in a way it would allow me to return a new MongoDB 
document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai  wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that 
uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro



Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Tzu-Li (Gordon) Tai
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that 
uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro

Re: #kafka/#ssl: how to enable ssl authentication for a new kafka consumer?

2017-02-14 Thread Tzu-Li (Gordon) Tai
Hi Alex,

Kafka authentication and data transfer encryption using SSL can be simply
done be configuring brokers and the connecting client.

You can take a look at this:
https://kafka.apache.org/documentation/#security_ssl.

The Kafka client that the Flink connector uses can be configured through the
`Properties` configuration provided when instantiating `FlinkKafkaConsumer`.
You just need to set values for these config properties:
https://kafka.apache.org/documentation/#security_configclients.

Note that SSL truststore / keystore locations must exist on all of your
Flink TMs for this to work.

Hope this helps!

- Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/kafka-ssl-how-to-enable-ssl-authentication-for-a-new-kafka-consumer-tp11532p11610.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: A way to control redistribution of operator state?

2017-02-13 Thread Tzu-Li (Gordon) Tai
Hi Dmitry,

Technically, from the looks of the internal code around 
`OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
Right now it is just hard coded to use a round-robin repartitioner 
implementation as default.

However, I’m not sure of the plans in exposing this to the user and making it 
configurable.
Looping in Stefan (in cc) who mostly worked on this part and see if he can 
provide more info.

- Gordon

On February 14, 2017 at 2:30:27 AM, Dmitry Golubets (dgolub...@gmail.com) wrote:

Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires updating it 
on every value change as opposed to operator state and thus can be expensive 
(especially if you have to deal with mutable structures inside which have to be 
serialized).

The problem is that there is no way to tell Flink how to reassign savepoint 
parts between partitions, and thus impossible to route data to correct 
partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry

Re: There is no Open and Close method in Async I/O API of Scala

2017-02-12 Thread Tzu-Li (Gordon) Tai
Hi Howard,

I don’t think there is a rich variant for Async IO in Scala yet. We should 
perhaps add support for it.

Looped in Till who worked on the Async IO and its Scala support to clarify 
whether there were any concerns in not supporting it initially.

Cheers,
Gordon


On February 13, 2017 at 9:49:32 AM, Howard,Li(vip.com) (howard...@vipshop.com) 
wrote:

Hi,

 I’m going to test async IO of scala version. As we can see in 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html.
 The java version of async IO API has method of open and close, in which I can 
do some init and clean work. The scala api, however,  has neither open nor 
close. Even if I can do init work while construct the class, the clean work 
can’t be done for there’s no close method.

 When I look into the RichAsyncFunction which the java api extends 
from, I find it’s the subclass of AbstractRichFunction which provide open and 
close method. I try to make my Async IO Function extends both 
AbstractRichFunction and AsyncFunction but find out that the open method does 
not called by flink so It won’t work.

 I managed to find a work around by getting javaStream from scala 
stream and use Java api instead, but I don’t think it’s idea.

 Did I miss something? Or it’s just a bug. If it is a bug, I can open a 
issue and try to fix it.

 

Thanks.

 

Howard

 

李哲豪 | 技术中心  实时计算平台

vip.com | 唯品会

唯品会 一家专门做特卖的网站

手机:15210965971 / 信箱: howard...@vipshop.com

地址:中国上海市闸北区西藏北路18号四行天地3楼

美国上市公司 纽交所代码:VIPS www.vip.com

 

本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.

Re: Specifying Schema dynamically

2017-02-12 Thread Tzu-Li (Gordon) Tai
Hi Luqman,

From your description, it seems like that you want to infer the type (case 
class, tuple, etc.) of a stream dynamically at runtime.
AFAIK, I don’t think this is supported in Flink. You’re required to have 
defined types for your DataStreams.

Could you also provide an example code of what the functionality you have in 
mind looks like?
That would help clarify if I have misunderstood and there’s actually a way to 
do it.

- Gordon

On February 12, 2017 at 4:30:56 PM, Luqman Ghani (lgsa...@gmail.com) wrote:

Like if a file has a header: id, first_name, last_name, last_login
and we infer schema as: Int, String, String, Long



Re: Start streaming tuples depending on another streams rate

2017-02-09 Thread Tzu-Li (Gordon) Tai
Hi Jonas,

A few things to clarify first:

Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, 
the rate drops to 10 tuples/s.

From this description it seems like the job is re-reading from the beginning 
from the topic, and once you reach the latest record at the head of the queue, 
you start getting the normal input rate again, correct?

What I now want is that while tuples from A are being processed in flatMap1, 
the stream B in flatMap2 should wait until the rate of the A stream has dropped 
and only then, be flatMap2 should be called.

So what you are looking for is that flatMap2 for stream B only doing work after 
the job reaches the latest record in stream A?

If that’s the case, I would not rely on determining a drop on the threshold 
rate value. It isn’t reliable because it’s dependent on stream A’s actual input 
rate, which naturally as a stream changes over time.

I’m not sure if it’s the best solution, but this is what I have in mind:
You could perhaps insert a special marker event into stream A every time you 
start running this job.
Your job can have an operator before your co-flatMap operator that expects this 
special marker, and when it receives it (which is when the head of stream A is 
reached),  broadcasts a special event to the co-flatMap for flatMap2 to be 
processed.
Then, once flatMap2 is invoked with the special event, you can toggle logic in 
flatMap2 to actually start doing stuff.

Cheers,
Gordon
On February 9, 2017 at 8:09:33 PM, Jonas (jo...@huntun.de) wrote:

Hi! I have a job that uses a RichCoFlatMapFunction of two streams: A and B.
A
.connect(B)
.keyBy(_.id, _.id)
.flatMap(new MyOp)
In MyOp, the A stream tuples are combined to form a state using a 
ValueStateDescriptor. Stream A is usually started from the beginning of a Kafka 
topic. Stream A has a rate of 100k tuples/s. After processing the whole Kafka 
queue, the rate drops to 10 tuples/s. A big drop. What I now want is that while 
tuples from A are being processed in flatMap1, the stream B in flatMap2 should 
wait until the rate of the A stream has dropped and only then, be flatMap2 
should be called. Ideally, this behaviour would be captured in a separate 
operator, like RateBasedStreamValve or something like that :) To solve this, my 
idea is to add a counter/timer in the RichCoFlatMapFunction that counts how 
many tuples have been processed from A. If the rate drops below a threshold 
(here maybe 15 tuples/s), flatMap2 that proesses tuples from B empties the 
buffer. However, this would make my RichCoFlatMapFunction much bigger and would 
not allow for operator reuse in other scenarios. I'm of course happy to answer 
if something is unclear. -- Jonas
View this message in context: Start streaming tuples depending on another 
streams rate
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Where to put "pre-start" logic and how to detect recovery?

2017-02-09 Thread Tzu-Li (Gordon) Tai
Hi Dmitry,

I think currently the simplest way to do this is simply to add a
program argument as a flag to whether or not the current run
is from a savepoint (so you manually supply the flag whenever you’re starting 
the
job from a savepoint), and check that flag in the main method.
The main method will only be executed at the client once on every job submit,
and not on job auto restarts from checkpoints due to failures.

Cheers,
Gordon

On February 9, 2017 at 11:08:54 PM, Dmitry Golubets (dgolub...@gmail.com) wrote:

Hi,

I need to re-create a Kafka topic when a job is started in "clean" mode.
I can do it, but I'm not sure if I do it in the right place.

Is it fine to put this kind of code in the "main"?
Then it's called on every job submit.
But.. how to detect if a job is being started from a savepoint?

Or is there a different approach?

Best regards,
Dmitry

Re: Fink: KafkaProducer Data Loss

2017-02-02 Thread Tzu-Li (Gordon) Tai
Hi Ninad and Till,

Thank you for looking into the issue! This is actually a bug.

Till’s suggestion is correct:
The producer holds a `pendingRecords` value that is incremented on each 
invoke() and decremented on each callback, used to check if the producer needs 
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after 
flushing the `pendingRecords == 0` and `asyncException == null` (currently, 
we’re only checking `pendingRecords`).

A quick fix for this is to check and rethrow async exceptions in the 
`snapshotState` method both before and after flushing and `pendingRecords` 
becomes 0.
I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5701.

Cheers,
Gordon

On February 3, 2017 at 6:05:23 AM, Till Rohrmann (trohrm...@apache.org) wrote:

Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might go 
under certain circumstances unnoticed. So for example you have a write 
operation which fails this will set the asyncException field which is not 
checked before the next invoke call happens. If now a checkpoint operation 
happens, it will pass and mark all messages up to this point as being 
successfully processed. Only after the checkpoint, the producer will fail. And 
this constitutes a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar 
with the Kafka producer. Maybe they can answer your and my questions.

Cheers,
Till

On Thu, Feb 2, 2017 at 9:58 PM, ninad  wrote:
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
'retry' mechanism doesn't kick in until a message is added to it's internal
buffer.

If there's an exception before that, KafkaProducer will throw that
exception, and seems like Flink isn't handling that. In this case there will
be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " +
e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

Here are the scenario's we've identified that will cause data loss:

All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries
to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
configured timeout, it throws an exception.
-Memory records not writable (Existing bug in kafka 0.9.0.1 library)
https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore
the messages. the messages aren't even logged. The exception is, but not the
messages which failed.

Possible workarounds (Kafka settings):

A very high value for metadata timeout (metadata.fetch.timeout.ms)
A very high value for buffer expiry (request.timeout.ms)
We're still investigating the possible side effects of changing the above
kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data
loss by modifying some Flink settings?

Thanks.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Support for Auto scaling

2017-02-01 Thread Tzu-Li (Gordon) Tai
Hi Sandeep!

While auto scaling jobs in Flink still isn’t possible, in Flink 1.2 you will be 
able to rescale jobs by stopping and restarting.
This works by taking a savepoint of the job before stopping the job, and then 
redeploy the job with a higher / lower parallelism using the savepoint.
Upon restarting the job, your states will be redistributed across the new 
operators.

Changing operator / job parallelism on the fly while running is still on the 
future roadmap.

Cheers,
Gordon

On February 2, 2017 at 8:39:39 AM, Meghashyam Sandeep V 
(vr1meghash...@gmail.com) wrote:

Hi Guys,

I currently run flink 1.1.4 streaming jobs in EMR in AWS with yarn. I 
understand that EMR supports auto scaling but Flink doesn't. Is there a plan 
for this support in 1.2. 

Thanks,
Sandeep

Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-18 Thread Tzu-Li (Gordon) Tai
Hi Andrew!

There’s nothing special about extending the checkpointing interfaces for the 
SinkFunction; for Flink they’re essentially user functions that have user state 
to be checkpointed.
So yes, you’ll just implement is as you would for a flatMap / map / etc. 
function.

Fell free to let me know if you bump into any questions.

Cheers,
Gordon


On January 16, 2017 at 11:37:30 PM, Andrew Roberts (arobe...@fuze.com) wrote:

Hi Gordon,

Thanks for getting back to me. The ticket looks good, but I’m going to need to 
do something similar for our homegrown sinks. It sounds like just having the 
affected sinks participate in checkpointing is enough of a solution - is there 
anything special about `SinkFunction[T]` extending `Checkpointed[S]`, or can I 
just implement it as I would for e.g. a mapping function?

Thanks,

Andrew



On Jan 13, 2017, at 4:34 PM, Tzu-Li (Gordon) Tai  wrote:

Hi Andrew,

Your observations are correct. Like you mentioned, the current problem circles 
around how we deal with the pending buffered requests with accordance to 
Flink’s checkpointing.
I’ve filed a JIRA for this, as well as some thoughts for the solution in the 
description: https://issues.apache.org/jira/browse/FLINK-5487. What do you 
think?

Thank you for bringing this up! We should probably fix this soon.
There’s already some on-going effort in fixing some other aspects of proper 
at-least-once support in the Elasticsearch sinks, so I believe this will be 
brought to attention very soon too.

Cheers,
Gordon




On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com) wrote:

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in 
terms of message delivery. according to (1), the ES sink offers at-least-once 
guarantees. This page doesn’t differentiate between flink-elasticsearch and 
flink-elasticsearch2, so I have to assume for the moment that they both offer 
that guarantee. However, a look at the code (2) shows that the invoke() method 
puts the record into a buffer, and then that buffer is flushed to elasticsearch 
some time later.



Re: How to read from a Kafka topic from the beginning

2017-01-18 Thread Tzu-Li (Gordon) Tai
Hi,

Yes, Flink does not rely on consumer offset commits in Kafka / Zookeeper. It 
manages offsets as checkpointed state with Flink, and uses those offsets for 
exactly-once.
Currently the “auto.offset.reset” is passed into the internally used 
KafkaConsumer as a means to define start position, but there’s work to further 
“de-couple” this so that respecting existing offsets in Kafka is merely one of 
the possible ways to define start position for the consumer, and that Flink 
isn’t depending on them in any way.

There’s more detail on this in these JIRAs:
- https://issues.apache.org/jira/browse/FLINK-4280
- https://issues.apache.org/jira/browse/FLINK-3398

Cheers,
Gordon

On January 16, 2017 at 8:58:21 PM, Matthias J. Sax (mj...@apache.org) wrote:

-BEGIN PGP SIGNED MESSAGE-  
Hash: SHA512  

I would put this differently: "auto.offset.reset" policy is only used,  
if there are no valid committed offsets for a topic.  

See here:  
http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups  
- -and-offset-management  

(don't be confused about "earliest/latest" and "smallest/larges" --  
the former is for Kafka 0.8.2 and the later for 0.9+ -- but the  
mechanism is the same)  

But I though, Flink does not rely on consumer offsets commits but does  
"manual" offset management? So I am wondering, if this property is  
passed into the Kafka source operator's Kafka consumer or not?  


- -Matthias  


On 11/17/15 2:30 AM, Robert Metzger wrote:  
> Hi Will,  
>  
> In Kafka's consumer configuration [1] there is a configuration  
> parameter called "auto.offset.reset". Setting it to "smallest" will  
> tell the consumer to start reading a topic from the smallest  
> available offset.  
>  
> You can pass the configuration using the properties of the Kafka  
> consumer.  
>  
>  
> [1] http://kafka.apache.org/documentation.html#consumerconfigs  
>  
>  
> On Tue, Nov 17, 2015 at 8:55 AM, Miaoyongqiang (Will)  
> mailto:miaoyongqi...@huawei.com>>  
> wrote:  
>  
> Hi,  
>  
> __ __  
>  
> How can I tell a “FlinkKafkaConsumer” that I want to read from a  
> topic from the beginning?  
>  
> __ __  
>  
> Thanks,  
>  
> Will  
>  
>  
-BEGIN PGP SIGNATURE-  
Comment: GPGTools - https://gpgtools.org  

iQIYBAEBCgAGBQJYfSWaAAoJELz8Z8hxAGOilZ0P2gJZzeSpSU5RK7gmrL5oohyA  
T+mKWXIkdMepDNec6w4zM0V07NnObu0UsVqPWEJmdOHg6bFihxmjO8i+7vYFShDH  
9h26pChB7W6nvrwrASRiTXLNQl9rhMrBmp2qsMXskjKCHn+pHGeT0+LIt91sCwL0  
VndFzk36UolfleGxpeQkcmPfNeTvlHws7nI5Imv5flsGIvWuGyJr/1v1Z2bWuXYj  
PxE2vndoQo4yvcgEfSI3kNnm3vKnflPi83SuCY5r+C2lfiz1c83GM/yPPwlcUR5c  
KjfeDQidy0B9npYkvTqoJV7Fm0oGvWjKKHCoS5HRrk4ha8WrakS/5FNpwf+FaOhi  
+TCCdi9TAHhYd0lD183HK/F6bbnHTvo75C9PsCjcF7gFWDOj9sBgvTNvz8SgokpQ  
g+QeiWtfi/YeU1TRWfM/KlpBdr5O/KmPFJ6XxIzXzUQmjR+z+Rp0j/hWq6o4loS5  
OlJbtZon08HMcGIC0hQOGlnF2tKMkwEuatA3/fDor9AU2TAmQjhdZGvAu/RIa9IX  
yKATrFjdxLLk3sUVvowTnnK1kSEApM4g3m3hGdPVzqsIWzbjgsNSvBDPKEma7oFu  
y3cpo+x7uqE0QkJpDaja2zvYdRu91lwAJIkpDPknE/Ip2x6j+sWPwz3NRTRK7eEN  
NH65TaPJXQvipDA=  
=iVUW  
-END PGP SIGNATURE-  


Re: Kafka KeyedStream source

2017-01-15 Thread Tzu-Li (Gordon) Tai
Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a 
simple “flatMap” or “filter" directly after the source can be chained to the 
source instances.
What that does is that the filter processing will be done within the same 
thread as the one fetching data from a Kafka partition, hence no excessive 
network transfers for this simple filtering.
You can read more about operator chaining here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#tasks-and-operator-chains

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a 
filter transformation right after, and then a keyBy followed with your 
heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?

Cheers,
Gordon

On January 11, 2017 at 4:11:26 PM, Niels Basjes (ni...@basjes.nl) wrote:

Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that 
makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of 
this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could 
filter the data more efficiently because the data would not need to go over the 
network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that 
follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai  wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion 
related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is 
designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a 
hash partitioner that is used when deciding which instance of the following 
downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on 
“addSource”, redistribution of data can still happen. I.e., if the parallelism 
of the compute operators right after is different than the number of Kafka 
partitions, redistribution will happen to let the key space and state be evenly 
distributed in Flink.

This leads to the argument that we probably need to think about whether 
retaining the original partitioning of records in Kafka when consumed by Flink 
is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its 
operators regardless of the parallelism of Kafka topics (rescaling isn’t 
actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be 
different than the number of Kafka partitions, and therefore redistributing 
must occur.
For redistribution to not need to take place right after an already partitioned 
Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink 
source instances consuming the partitions, and 3) the parallelism of the keyed 
computation afterwards. This seems like a very specific situation, considering 
that you’ll be able to rescale Flink operators as the data’s key space / volume 
grows.

The main observation, I think, is that Flink itself maintains how the key space 
is partitioned within the system, which plays a crucial part in rescaling. 
That’s why by default it doesn’t respect existing partitioning of the key space 
in Kafka (or other external sources). Even if it initially does at the 
beginning of a job, partitioning will most likely change as you rescale your 
job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/kafka-partition-assignment-td12123.html

On January 6, 2017 at 1:38:05 AM, Niels Basjes (ni...@basjes.nl) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the 
same sessionId into the same Kafka partition. That way I already have all 
events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I 
have to do a keyBy before my processing can continue. Such a keyBy will 
redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that 
immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Deduplicate messages from Kafka topic

2017-01-15 Thread Tzu-Li (Gordon) Tai
Hi,

You’re correct that the FlinkKafkaProducer may emit duplicates to Kafka topics, 
as it currently only provides at-least-once guarantees.
Note that this isn’t a restriction only in the FlinkKafkaProducer, but a 
general restriction for Kafka's message delivery.
This can definitely be improved to exactly-once (no duplicates produced into 
topics) once Kafka supports transactional messaging.

On the consumer side, the FlinkKafkaConsumer doesn’t have built-in support to 
dedupe the messages read from topics.
On the other hand this isn’t really feasible, as consumers could basically only 
view messages with different offsets as separate independent messages, unless 
identified by some user application-level logic.
So in the end, we’ll need to rely on the assumption that messages produced into 
Kafka topics are not duplicated, which as explained above, will hopefully be 
available in the near future.

Cheers,
Gordon

On January 14, 2017 at 6:12:29 PM, ljwagerfield (lawre...@dmz.wagerfield.com) 
wrote:

As I understand it, the Flink Kafka Producer may emit duplicates to Kafka  
topics.  

How can I deduplicate these messages when reading them back with Flink (via  
the Flink Kafka Consumer)?  

For example, is there any out-the-box support for deduplicating messages,  
i.e. by incorporating something like "idempotent producers" as proposed by  
Jay Krepps (which, as I understand it, involves maintaining a "high  
watermark" on a message-by-message level)?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deduplicate-messages-from-Kafka-topic-tp11051.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-13 Thread Tzu-Li (Gordon) Tai
Hi Andrew,

Your observations are correct. Like you mentioned, the current problem circles 
around how we deal with the pending buffered requests with accordance to 
Flink’s checkpointing.
I’ve filed a JIRA for this, as well as some thoughts for the solution in the 
description: https://issues.apache.org/jira/browse/FLINK-5487. What do you 
think?

Thank you for bringing this up! We should probably fix this soon.
There’s already some on-going effort in fixing some other aspects of proper 
at-least-once support in the Elasticsearch sinks, so I believe this will be 
brought to attention very soon too.

Cheers,
Gordon




On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com) wrote:

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in 
terms of message delivery. according to (1), the ES sink offers at-least-once 
guarantees. This page doesn’t differentiate between flink-elasticsearch and 
flink-elasticsearch2, so I have to assume for the moment that they both offer 
that guarantee. However, a look at the code (2) shows that the invoke() method 
puts the record into a buffer, and then that buffer is flushed to elasticsearch 
some time later.



Re: Kafka topic partition skewness causes watermark not being emitted

2017-01-13 Thread Tzu-Li (Gordon) Tai
Hi,

This is expected behaviour due to how the per-partition watermarks are designed 
in the Kafka consumer, but I think it’s probably a good idea to handle idle 
partitions also when the Kafka consumer itself emits watermarks. I’ve filed a 
JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-5479.

For the time being, I don’t think there will be an easy way to avoid this with 
the existing APIs, unfortunately. Is the skewed partition data intentional, or 
only for experimental purposes?

Best,
Gordon

On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote:

Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition 0 
and no data to partition 1. I created a Flink job with parallelism to 1 that 
consumes that topic and count the events with session event window (5 seconds 
gap). It turned out that the session event window was never closed even I sent 
a message with 10 minutes gap. After digging into the source code, 
AbstractFetcher[1] that is responsible for sending watermark to downstream 
calculates the min watermark of all partitions. Due to the fact that we don't 
have data in partition 1, the watermark returned from partition 1is always 
Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to 
downstream. 

I want to know if this is expected behavior or a bug. If this is expected 
behavior how do I avoid the delay of watermark firing when data is not evenly 
distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements 
AssignerWithPeriodicWatermarks {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : 
currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long 
previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010 consumer = new 
FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//        // execute program
env.execute("a job");

I used the latest code in github

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539


Re: Kafka 09 consumer does not commit offsets

2017-01-09 Thread Tzu-Li (Gordon) Tai
Good to know!


On January 10, 2017 at 1:06:29 PM, Renjie Liu (liurenjie2...@gmail.com) wrote:

Hi, all:
I used kafka connector 0.10 and the problem is fixed. I think this maybe caused 
by incompatible between consumer 0.9 and broker 0.10.
Thanks Henri and Gordon.

On Tue, Jan 10, 2017 at 4:46 AM Henri Heiskanen  
wrote:
Hi,

We had the same problem when running 0.9 consumer against 0.10 Kafka. Upgrading 
Flink Kafka connector to 0.10 fixed our issue.

Br,
Henkka

On Mon, Jan 9, 2017 at 5:39 PM, Tzu-Li (Gordon) Tai  wrote:
Hi,

Not sure what might be going on here. I’m pretty certain that for 
FlinkKafkaConsumer09 when checkpointing is turned off, the internally used 
KafkaConsumer client will auto commit offsets back to Kafka at a default 
interval of 5000ms (the default value for “auto.commit.interval.ms”).

Could you perhaps provide the logs of your job (you can send them to me 
privately if you prefer to)?
From the logs we should be able to see if the internal KafkaConsumer client is 
correctly configured to auto commit and also check if anything strange is going 
on.

Also, how are you reading the committed offsets in Kafka? I recall there was a 
problem with the 08 consumer that resulted in the Kafka cli not correctly 
showing committed offsets of consumer groups.
However, the 08 consumer had this problem only because we had to implement the 
auto offset committing ourselves. I don’t think this should be a issue for the 
09 consumer, since we’re solely relying on the Kafka client’s own 
implementation to do the auto offset committing.

Cheers,
Gordon


On January 9, 2017 at 7:55:33 PM, Timo Walther (twal...@apache.org) wrote:

I'm not a Kafka expert but maybe Gordon (in CC) knows more.

Timo


Am 09/01/17 um 11:51 schrieb Renjie Liu:
> Hi, all:
> I'm using flink 1.1.3 and kafka consumer 09. I read its code and it
> says that the kafka consumer will turn on auto offset commit if
> checkpoint is not enabled. I've turned off checkpoint and it seems
> that kafka client is not committing to offsets to kafka? The offset is
> important for helping us monitoring. Anyone has encountered this before?
> --
> Liu, Renjie
> Software Engineer, MVAD



--
Liu, Renjie
Software Engineer, MVAD

Re: Kafka 09 consumer does not commit offsets

2017-01-09 Thread Tzu-Li (Gordon) Tai
Hi,

Not sure what might be going on here. I’m pretty certain that for 
FlinkKafkaConsumer09 when checkpointing is turned off, the internally used 
KafkaConsumer client will auto commit offsets back to Kafka at a default 
interval of 5000ms (the default value for “auto.commit.interval.ms”).

Could you perhaps provide the logs of your job (you can send them to me 
privately if you prefer to)?
From the logs we should be able to see if the internal KafkaConsumer client is 
correctly configured to auto commit and also check if anything strange is going 
on.

Also, how are you reading the committed offsets in Kafka? I recall there was a 
problem with the 08 consumer that resulted in the Kafka cli not correctly 
showing committed offsets of consumer groups.
However, the 08 consumer had this problem only because we had to implement the 
auto offset committing ourselves. I don’t think this should be a issue for the 
09 consumer, since we’re solely relying on the Kafka client’s own 
implementation to do the auto offset committing.

Cheers,
Gordon


On January 9, 2017 at 7:55:33 PM, Timo Walther (twal...@apache.org) wrote:

I'm not a Kafka expert but maybe Gordon (in CC) knows more.  

Timo  


Am 09/01/17 um 11:51 schrieb Renjie Liu:  
> Hi, all:  
> I'm using flink 1.1.3 and kafka consumer 09. I read its code and it  
> says that the kafka consumer will turn on auto offset commit if  
> checkpoint is not enabled. I've turned off checkpoint and it seems  
> that kafka client is not committing to offsets to kafka? The offset is  
> important for helping us monitoring. Anyone has encountered this before?  
> --  
> Liu, Renjie  
> Software Engineer, MVAD  




Re: Kafka KeyedStream source

2017-01-09 Thread Tzu-Li (Gordon) Tai
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion 
related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is 
designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a 
hash partitioner that is used when deciding which instance of the following 
downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on 
“addSource”, redistribution of data can still happen. I.e., if the parallelism 
of the compute operators right after is different than the number of Kafka 
partitions, redistribution will happen to let the key space and state be evenly 
distributed in Flink.

This leads to the argument that we probably need to think about whether 
retaining the original partitioning of records in Kafka when consumed by Flink 
is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its 
operators regardless of the parallelism of Kafka topics (rescaling isn’t 
actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be 
different than the number of Kafka partitions, and therefore redistributing 
must occur.
For redistribution to not need to take place right after an already partitioned 
Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink 
source instances consuming the partitions, and 3) the parallelism of the keyed 
computation afterwards. This seems like a very specific situation, considering 
that you’ll be able to rescale Flink operators as the data’s key space / volume 
grows.

The main observation, I think, is that Flink itself maintains how the key space 
is partitioned within the system, which plays a crucial part in rescaling. 
That’s why by default it doesn’t respect existing partitioning of the key space 
in Kafka (or other external sources). Even if it initially does at the 
beginning of a job, partitioning will most likely change as you rescale your 
job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/kafka-partition-assignment-td12123.html

On January 6, 2017 at 1:38:05 AM, Niels Basjes (ni...@basjes.nl) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the 
same sessionId into the same Kafka partition. That way I already have all 
events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I 
have to do a keyBy before my processing can continue. Such a keyBy will 
redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that 
immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Joining two kafka streams

2017-01-08 Thread Tzu-Li (Gordon) Tai
Hi Igor!

What you can actually do is let a single FlinkKafkaConsumer consume from both 
topics, producing a single DataStream which you can keyBy afterwards.
All versions of the FlinkKafkaConsumer support consuming multiple Kafka topics 
simultaneously. This is logically the same as union and then a keyBy, like what 
you described.

Note that this approach requires that the records in both of your Kafka topics 
are of the same type when consumed into Flink (ex., same POJO classes, or 
simply both as Strings, etc.).
If that isn’t possible and you have different data types / schemas for the 
topics, you’d probably need to use “connect” and then a keyBy.

If you’re applying a window directly after joining the two topic streams, you 
could also use a window join:
dataStream.join(otherStream)
.where().equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
The “where” specifies how to select the key from the first stream, and 
“equalTo” the second one.

Hope this helps, let me know if you have other questions!

Cheers,
Gordon

On January 9, 2017 at 4:06:34 AM, igor.berman (igor.ber...@gmail.com) wrote:

Hi,  
I have usecase when I need to join two kafka topics together by some fields.  
In general, I could put content of one topic into another, and partition by  
same key, but I can't touch those two topics(i.e. there are other consumers  
from those topics), on the other hand it's essential to process same keys at  
same "thread" to achieve locality and not to get races when working with  
same key from different machines/threads  

my idea is to use union of two streams and then key by the field,  
but is there better approach to achieve "locality"?  

any inputs will be appreciated  
Igor  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-kafka-streams-tp10912.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Serializers and Schemas

2016-12-07 Thread Tzu-Li (Gordon) Tai
Hi Matt,

1. There’s some in-progress work on wrapper util classes for Kafka 
de/serializers here [1] that allows
Kafka de/serializers to be used with the Flink Kafka Consumers/Producers with 
minimal user overhead.
The PR also has some proposed adds to the documentations for the wrappers.

2. I feel that it would be good to have more documentation on Flink’s 
de/serializers because they’ve been
frequently asked about on the mailing lists, but at the same time, probably the 
fastest / efficient de/serialization
approach would be tailored for each use case, so we’d need to think more on the 
presentation and the purpose
of the documentation.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/2705

On December 8, 2016 at 5:00:19 AM, milind parikh (milindspar...@gmail.com) 
wrote:

Why not use a self-describing format  (json), stream as String and read through 
a json reader and avoid top-level reflection?

Github.com/milindparikh/streamingsi

https://github.com/milindparikh/streamingsi/tree/master/epic-poc/sprint-2-simulated-data-no-cdc-advanced-eventing/2-dataprocessing

?

Apologies if I misunderstood the question. But I can quite see how to model 
your Product class (or indeed POJO) in a fairly generic way ( assumes JSON).

The real issues faced when you have different versions of same POJO class 
requires storing enough information to dynamically instantiate the actual 
version of the class; which I believe is beyond the simple use case.

Milind

On Dec 7, 2016 2:42 PM, "Matt"  wrote:
I've read your example, but I've found the same problem. You're serializing 
your POJO as a string, where all fields are separated by "\t". This may work 
for you, but not in general.

https://github.com/luigiselmi/pilot-sc4-fcd-producer/blob/master/src/main/java/eu/bde/sc4pilot/fcd/FcdTaxiEvent.java#L60

I would like to see a more "generic" approach for the class Product in my last 
message. I believe a more general purpose de/serializer for POJOs should be 
possible to achieve using reflection.

On Wed, Dec 7, 2016 at 1:16 PM, Luigi Selmi  wrote:
Hi Matt,

I had the same problem, trying to read some records in event time using a POJO, 
doing some transformation and save the result into Kafka for further 
processing. I am not yet done but maybe the code I wrote starting from the 
Flink Forward 2016 training docs could be useful.

https://github.com/luigiselmi/pilot-sc4-fcd-producer


Best,

Luigi 

On 7 December 2016 at 16:35, Matt  wrote:
Hello,

I don't quite understand how to integrate Kafka and Flink, after a lot of 
thoughts and hours of reading I feel I'm still missing something important.

So far I haven't found a non-trivial but simple example of a stream of a custom 
class (POJO). It would be good to have such an example in Flink docs, I can 
think of many many scenarios in which using SimpleStringSchema is not an 
option, but all Kafka+Flink guides insist on using that.

Maybe we can add a simple example to the documentation [1], it would be really 
helpful for many of us. Also, explaining how to create a Flink 
De/SerializationSchema from a Kafka De/Serializer would be really useful and 
would save a lot of time to a lot of people, it's not clear why you need both 
of them or if you need both of them.

As far as I know Avro is a common choice for serialization, but I've read 
Kryo's performance is much better (true?). I guess though that the fastest 
serialization approach is writing your own de/serializer.

1. What do you think about adding some thoughts on this to the documentation?
2. Can anyone provide an example for the following class?

---
public class Product {
    public String code;
    public double price;
    public String description;
    public long created;
}
---

Regards,
Matt

[1] http://data-artisans.com/kafka-flink-a-practical-how-to/



--
Luigi Selmi, M.Sc.
Fraunhofer IAIS Schloss Birlinghoven . 
53757 Sankt Augustin, Germany
Phone: +49 2241 14-2440




Re: Partitioning operator state

2016-12-07 Thread Tzu-Li (Gordon) Tai
Hi Dominik,

Do you mean how Flink redistributes an operator’s state when the parallelism of 
the operator is changed?
If so, you can take a look at [1] and [2].

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-3755
[2] 
https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#

On December 8, 2016 at 4:40:18 AM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

Hi everyone,  

In the case of scaling out a Flink cluster, how does Flink handle operator 
state partitioning of a staged topology?  

Regards,  
Dominik  



Re: Flink Kafka producer with a topic per message

2016-12-07 Thread Tzu-Li (Gordon) Tai
Hi,

The FlinkKafkaProducers currently support per record topic through the
user-provided serialization schema which has a "getTargetTopic(T element)" 
method called for per record,
and also decides the partition the record will be sent to through a custom 
KafkaPartitioner, which is also provided 
by the user when creating a FlinkKafkaProducer.

Does this already provide the functionality you’ve mentioned? Or have I 
misunderstood what you have in mind?

Cheers,
Gordon


On December 7, 2016 at 5:55:24 PM, Sanne de Roever (sanne.de.roe...@gmail.com) 
wrote:

The next step would be to determine the impact on the interface of a Sink. 
Currently a Kafka sink has one topic, for example:



Re: Why use Kafka after all?

2016-11-22 Thread Tzu-Li (Gordon) Tai
Hi Matt,

Just to be clear, what I'm looking for is a way to serialize a POJO class for 
Kafka but also for Flink, I'm not sure the interface of both frameworks are 
compatible but it seems they aren't.

For Kafka (producer) I need a Serializer and a Deserializer class, and for 
Flink (consumer) a SerializationSchema and DeserializationSchema class.

Any example of how to put this together would be greatly appreciated.

There’s actually a related JIRA for this: 
https://issues.apache.org/jira/browse/FLINK-4050.
The corresponding PR is https://github.com/apache/flink/pull/2705, which adds 
wrappers for the Kafka serializers.
Is this feature what you’re probably looking for?

Best Regards,
Gordon


On November 18, 2016 at 12:11:23 PM, Matt (dromitl...@gmail.com) wrote:

Just to be clear, what I'm looking for is a way to serialize a POJO class for 
Kafka but also for Flink, I'm not sure the interface of both frameworks are 
compatible but it seems they aren't.

For Kafka (producer) I need a Serializer and a Deserializer class, and for 
Flink (consumer) a SerializationSchema and DeserializationSchema class.

Any example of how to put this together would be greatly appreciated.

On Thu, Nov 17, 2016 at 9:12 PM, Dromit  wrote:
Tzu-Li Tai, thanks for your response.

I've seen the example you mentioned before, TaxiRideSchema.java, but it's way 
too simplified.

In a real POJO class you may have multiple fields such as integers, strings, 
doubles, etc. So serializing them as a string like in the example wouldn't work 
(you can't put together two arbitrary strings and later split the byte array to 
get each of them, same for two integers, and nearly any other types).

I feel there should be a more general way of doing this regardless of the 
fields on the class you're de/serializing.

What do you do in these cases? It should be a pretty common scenario!

Regards,
Matt

On Wed, Nov 16, 2016 at 2:01 PM, Philipp Bussche  
wrote:
Hi Dromit

I started using Flink with Kafka but am currently looking into Kinesis to
replace Kafka.
The reason behind this is that eventually my application will run in
somebody's cloud and if I go for AWS then I don't have to take care of
operating Kafka and Zookeeper myself. I understand this can be a challenging
task.
Up to know where the Kafka bit is only running in a local test environment I
am happy running it as I just start 2 Docker containers and it does the job.
But this also means I have no clue how Kafka really works and what I need to
be careful with.
Besides knowledge which is required as it seems for Kafka costs is another
aspect here.
If one wants to operate a Kafka cluster plus Zookeeper on let's say the
Amazon cloud this might actually be more expensive than "just" using Kinesis
as a service.
There are apparently draw backs in terms of functionality and performance
but for my use case that does not seem to matter.

Philipp



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-use-Kafka-after-all-tp10112p10155.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.




Re: flink-dist shading

2016-11-18 Thread Tzu-Li (Gordon) Tai
Hi Craig,

I think the email wasn't sent to the ‘dev’ list, somehow.

Have you tried this:

mvn clean install -DskipTests
# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so 
we need to run mvn for flink-dist again.
cd flink-dist
mvn clean install -DskipTests
I agree that it’ll affect downstream users who need to build Flink themselves, 
and would be best if it can be resolved.
The above is still more or less a “workaround”, but since I don’t really know 
the reason for why the newer Maven versions
won’t properly shade, we’ll probably need to wait for others more knowledgable 
on the build infrastructure to chime in and
see if there’s a good long-term solution.

Best Regards,
Gordon
On November 19, 2016 at 8:48:32 AM, Foster, Craig (foscr...@amazon.com) wrote:

I’m not even sure this was delivered to the ‘dev’ list but I’ll go ahead and 
forward the same email to the ‘user’ list since I haven’t seen a response.

---

 

I’m following up on the issue in FLINK-5013 about flink-dist specifically 
requiring Maven 3.0.5 through to <3.3. This affects people who build Flink with 
BigTop (not only EMR), so I’m wondering about the context and how we can 
properly shade the Apache HTTP libraries so that flink-dist can be built with a 
current version of Maven. Any insight into this would be helpful.

 

Thanks!

Craig

 

Re: Subtask keeps on discovering new Kinesis shard when using Kinesalite

2016-11-16 Thread Tzu-Li (Gordon) Tai
Hi Phillip,

Thanks for testing it. From your log and my own tests, I can confirm the 
problem is with Kinesalite not correctly
mocking the official Kinesis behaviour for the `describeStream` API.

There’s a PR for the fix here: https://github.com/apache/flink/pull/2822. With 
this change, shard discovery
should work normally when tested against Kinesalite.

However, I’m not completely sure yet if the fix is viable, and would like to 
wait for others to take a look / review.
Therefore, it might not make it into the next Flink minor bugfix release. If 
you’d like, you can try out the patch for now
and see if the problem remains.

Best Regards,
Gordon

On November 17, 2016 at 1:07:44 AM, Philipp Bussche (philipp.buss...@gmail.com) 
wrote:

Hello Gordon,  

thank you for your help. I have set the discovery interval to 30 seconds and  
just starting the job on a clean kinesalite service (I am running it inside  
docker so every time the container gets stopped and removed to start from  
scratch).  

This is the output without actually any data in the stream:  

11/16/2016 17:59:03 Source: Custom Source -> Sink: Unnamed(1/1) switched to 
 
RUNNING  
17:59:04,673 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 will be seeded with initial shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'}, starting  
state set as sequence number LATEST_SEQUENCE_NUM  
17:59:04,674 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 will start consuming seeded shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} from sequence  
number LATEST_SEQUENCE_NUM with ShardConsumer 0  
17:59:04,689 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 has discovered a new shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} due to  
resharding, and will start consuming the shard from sequence number  
EARLIEST_SEQUENCE_NUM with ShardConsumer 1  
17:59:08,817 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 1 @ 1479315548815  
17:59:08,835 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 1 (in 20 ms)  
17:59:13,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 2 @ 1479315553815  
17:59:13,817 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 2 (in 1 ms)  
17:59:18,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 3 @ 1479315558814  
17:59:18,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 3 (in 1 ms)  
17:59:23,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 4 @ 1479315563815  
17:59:23,816 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 4 (in 1 ms)  
17:59:28,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 5 @ 1479315568813  
17:59:28,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 5 (in 1 ms)  
17:59:33,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 6 @ 1479315573814  
17:59:33,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 6 (in 1 ms)  
17:59:34,704 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 has discovered a new shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} due to  
resharding, and will start consuming the shard from sequence number  
EARLIEST_SEQUENCE_NUM with ShardConsumer 2  

I then restarted the kinesalite container and posted a message to the stream  
before the 30 second mark occurred. The output shows that the job consumes  
from the 2 shards discovered initially (I initialized kinsalite with one  
shard only) right away and then continues to consume f

Re: Why use Kafka after all?

2016-11-15 Thread Tzu-Li (Gordon) Tai
Hi Matt,

Here’s an example of writing a DeserializationSchema for your POJOs: [1].

As for simply writing messages from WebSocket to Kafka using a Flink job, while 
it is absolutely viable, I would not recommend it,
mainly because you’d never know if you might need to temporarily shut down 
Flink jobs (perhaps for a version upgrade).

Shutting down the WebSocket consuming job, would then, of course, lead to 
missing messages during the shutdown time.
It would be perhaps simpler if you have a separate Kafka producer application 
to directly ingest messages from the WebSocket to Kafka.
You wouldn’t want this application to be down at all, so that all messages can 
safely land into Kafka first. I would recommend to keep this part
as simple as possible.

From there, like Till explained, your Flink processing pipelines can rely on 
Kafka’s replayability to provide exactly-once processing guarantees on your 
data.

Best,
Gordon


[1] 
https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/utils/TaxiRideSchema.java




On November 16, 2016 at 1:07:12 PM, Dromit (dromitl...@gmail.com) wrote:

"So in your case I would directly ingest my messages into Kafka"

I will do that through a custom SourceFunction that reads the messages from the 
WebSocket, creates simple java objects (POJOs) and sink them in a Kafka topic 
using a FlinkKafkaProducer, if that makes sense.

The problem now is I need a DeserializationSchema for my class. I read Flink is 
able to de/serialize POJO objects by its own, but I'm still required to provide 
a serializer to create the FlinkKafkaProducer (and FlinkKafkaConsumer).

Any idea or example? Should I create a DeserializationSchema for each POJO 
class I want to put into a Kafka stream?



On Tue, Nov 15, 2016 at 7:43 AM, Till Rohrmann  wrote:
Hi Matt,

as you've stated Flink is a stream processor and as such it needs to get its 
inputs from somewhere. Flink can provide you up to exactly-once processing 
guarantees. But in order to do this, it requires a re-playable source because 
in case of a failure you might have to reprocess parts of the input you had 
already processed prior to the failure. Kafka is such a source and people use 
it because it happens to be one of the most popular and widespread open source 
message queues/distributed logs.

If you don't require strong processing guarantees, then you can simply use the 
WebSocket source. But, for any serious use case, you probably want to have 
these guarantees because otherwise you just might calculate bogus results. So 
in your case I would directly ingest my messages into Kafka and then let Flink 
read from the created topic to do the processing.

Cheers,
Till

On Tue, Nov 15, 2016 at 8:14 AM, Dromit  wrote:
Hello,

As far as I've seen, there are a lot of projects using Flink and Kafka 
together, but I'm not seeing the point of that. Let me know what you think 
about this.

1. If I'm not wrong, Kafka provides basically two things: storage (records 
retention) and fault tolerance in case of failure, while Flink mostly cares 
about the transformation of such records. That means I can write a pipeline 
with Flink alone, and even distribute it on a cluster, but in case of failure 
some records may be lost, or I won't be able to reprocess the data if I change 
the code, since the records are not kept in Flink by default (only when sinked 
properly). Is that right?

2. In my use case the records come from a WebSocket and I create a custom class 
based on messages on that socket. Should I put those records inside a Kafka 
topic right away using a Flink custom source (SourceFunction) with a Kafka sink 
(FlinkKafkaProducer), and independently create a Kafka source (KafkaConsumer) 
for that topic and pipe the Flink transformations there? Is that data flow fine?

Basically what I'm trying to understand with both question is how and why 
people are using Flink and Kafka.

Regards,
Matt




Re: Subtask keeps on discovering new Kinesis shard when using Kinesalite

2016-11-15 Thread Tzu-Li (Gordon) Tai
Hi Philipp,

When used against Kinesalite, can you tell if the connector is already reading 
data from the test shard before any
of the shard discovery messages? If you have any spare time to test this, you 
can set a larger value for the
`ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the config 
properties to easier test this.

If yes, I’m suspecting the problem is that Kinesalite hasn’t sticked to the 
actual Kinesis behaviour for some of the APIs.
Specifically, I think the problem is with the `describeStream(streamName, 
lastSeenShardId)` Kinesis API, where the
expected behaviour is that the returned shard list only contains shardIds after 
`lastSeenShardId`. Perhaps Kinesalite
didn’t follow the behaviour on this part. That’s why the connector kept on 
determining that it’s a new discovered shard.

I’ll investigate and try to reproduce the problem, and see if there’s a good 
way to workaround this for Kinesalite.
Thank you for reporting the issue, I’ve filed up a JIRA 
(https://issues.apache.org/jira/browse/FLINK-5075) for this.

Best,
Gordon


On November 16, 2016 at 5:03:17 AM, Philipp Bussche (philipp.buss...@gmail.com) 
wrote:

has discovered a new shard 
KinesisStreamShard

Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Tzu-Li (Gordon) Tai
Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a 
shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it 
was used to fetch the next batch of records, is on deserializing and emitting 
of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this 
normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task 
managers? From your description and check in code, the only possible guess I 
can come up with now is that
the source tasks completely seized to be running for a period of time, and when 
it came back, the shard iterator was unexpectedly found to be expired. 
According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more 
batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak 
within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no 
problems, but yesterday the Kinesis consumer started behaving strangely... My 
Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink 
Kinesis consumer started to stop consuming for periods of time (see the spikes 
in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this 
log message which I believe is related to the problem:
2016-11-03 09:27:53,782 WARN  
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - 
Encountered an unexpected expired iterator 
AAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
 for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
85070511730234615865841151857942042863},SequenceNumberRange: 
{StartingSequenceNumber: 
49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the 
iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine 
again with no problems.

Do you have any idea what might be causing this, or anything I should do to 
investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai  wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the 
release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing 
this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can 
also build on the current “release-1.1” branch, which already has FLINK-4514 
merged.
Sorry for the inconvenience. Let me know if you bump into any other problems 
afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
> expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while 
> right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future 
> than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
> Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
> dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen



Re: A question regarding to the checkpoint mechanism

2016-10-16 Thread Tzu-Li (Gordon) Tai
Users don’t need to explicitly make a copy of the state. Take checkpointing 
instance fields as operator state for example [1].
You simply return your current state in `snapshotState()`, and Flink will take 
care of snapshotting and persisting it to the state backend.
The persisting process does not block processing of input records if you 
implement the `CheckpointedAsynchronously` interface (which is usually the more 
desirable case).
The same goes for key-partitioned states.

Best Regards,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html#checkpointing-instance-fields

On October 17, 2016 at 11:32:07 AM, Li Wang (wangli1...@gmail.com) wrote:

Hi Gordon,

Thanks for your prompt reply.
So do you mean when we are about to checkpoint the state of an operator, we 
first copy its state and then checkpoint the copied state while the operator 
continues processing?

Thanks,
Li


On Oct 17, 2016, at 11:10 AM, Tzu-Li (Gordon) Tai  wrote:

Hi!

No, the operator does not need to pause processing input records while the 
checkpointing of its state is in progress.
The checkpointing of operator state is asynchronous. The operator state does 
not need to be immutable, since its a copy of the snapshot state that’s 
checkpointed.

Regards,
Gordon


On October 17, 2016 at 10:28:34 AM, Li Wang (wangli1...@gmail.com) wrote:

Hi All, 

Any feedback is highly appreciated. 

Thanks. 
Li 

> On Oct 15, 2016, at 11:17 AM, Li Wang  wrote: 
>  
> Hi all, 
>  
> As far as I know, a stateful operator will checkpoint its current state to a 
> persistent storage when it receives all the barrier from all of its upstream 
> operators. My question is that does the operator doing the checkpoint need to 
> pause processing the input tuples for the next batch until the checkpoint is 
> done? If yes, will it introduce significant processing latency when the state 
> is large. If no, does this need the operator state to be immutable? 
>  
> Thanks, 
> Li



Re: A question regarding to the checkpoint mechanism

2016-10-16 Thread Tzu-Li (Gordon) Tai
Hi!

No, the operator does not need to pause processing input records while the 
checkpointing of its state is in progress.
The checkpointing of operator state is asynchronous. The operator state does 
not need to be immutable, since its a copy of the snapshot state that’s 
checkpointed.

Regards,
Gordon


On October 17, 2016 at 10:28:34 AM, Li Wang (wangli1...@gmail.com) wrote:

Hi All,  

Any feedback is highly appreciated.  

Thanks.  
Li  

> On Oct 15, 2016, at 11:17 AM, Li Wang  wrote:  
>  
> Hi all,  
>  
> As far as I know, a stateful operator will checkpoint its current state to a 
> persistent storage when it receives all the barrier from all of its upstream 
> operators. My question is that does the operator doing the checkpoint need to 
> pause processing the input tuples for the next batch until the checkpoint is 
> done? If yes, will it introduce significant processing latency when the state 
> is large. If no, does this need the operator state to be immutable?  
>  
> Thanks,  
> Li  



Re: ExpiredIteratorException when reading from a Kinesis stream

2016-10-04 Thread Tzu-Li (Gordon) Tai
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the 
release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing 
this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can 
also build on the current “release-1.1” branch, which already has FLINK-4514 
merged.
Sorry for the inconvenience. Let me know if you bump into any other problems 
afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi there,  

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events  
from a Kinesis stream. However, after a while (the exact duration varies  
and is in the order of minutes) the Kinesis source doesn't emit any  
further events and hence Flink doesn't produce any further output.  
Eventually, an ExpiredIteratorException occurs in one of the task,  
causing the entire job to fail:  

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
> expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while 
> right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future 
> than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
> Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
> dace9532-9031-54bc-8aa2-3cbfb136d590)  

This seems to be related to FLINK-4514, which is marked as resovled for  
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm  
running isn't suspended but hangs just a few minutes after the job has  
been started.  

I've attached a log file showing the described behavior.  

Any idea what may be wrong?  

Thanks,  
Steffen  


Re: Presented Flink use case in Japan

2016-10-04 Thread Tzu-Li (Gordon) Tai
Really great to hear this!

Cheers,
Gordon


On October 4, 2016 at 8:13:27 PM, Till Rohrmann (trohrm...@apache.org) wrote:

It's always great to hear Flink success stories :-) Thanks for sharing it with 
the community. 

I hope Flink helps you to solve even more problems. And don't hesitate to reach 
out to the community whenever you stumble across some Flink problems.

Cheers,
Till

On Tue, Oct 4, 2016 at 2:04 PM, Hironori Ogibayashi  
wrote:
Hello,

Just for information.

Last week, I have presented our Flink use case in my company's conference.
(http://developers.linecorp.com/blog/?p=3992)

Here is the slide.
http://www.slideshare.net/linecorp/b-6-new-stream-processing-platformwith-apache-flink
I think the video with English subtitle will also be published soon.

The use case itself might not be very interesting, but I think this is the
first Flink production use case in Japan opened to the public.

Thank you for great software.

Regards,
Hironori Ogibayashi



Re: Flink Checkpoint runs slow for low load stream

2016-10-04 Thread Tzu-Li (Gordon) Tai
Hi,

Helping out here: this is the PR for async Kafka offset committing - 
https://github.com/apache/flink/pull/2574.
It has already been merged into the master and release-1.1 branches, so you can 
try out the changes now if you’d like.
The change should also be included in the 1.1.3 release, which the Flink 
community is discussing to release soon.

Will definitely be helpful if you can provide feedback afterwards!

Best Regards,
Gordon


On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga 
(chakravarth...@gmail.com) wrote:

Hi Stephan,

    Is the Async kafka offset commit released in 1.3.1?

Varaga

On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga  
wrote:
Hi Stephan,

 That should be great. Let me know once the fix is done and the snapshot 
version to use, I'll check and revert then.
 Can you also share the JIRA that tracks the issue?
 
 With regards to offset commit issue, I'm not sure as to how to proceed 
here. Probably I'll use your fix first and see if the problem reoccurs.

Thanks much
Varaga

On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen  wrote:
@CVP

Flink stores in checkpoints in your case only the Kafka offsets (few bytes) and 
the custom state (e).

Here is an illustration of the checkpoint and what is stored (from the Flink 
docs).
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html


I am quite puzzled why the offset committing problem occurs only for one input, 
and not for the other.
I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
Could you try out a snapshot version to see if that fixes your problem?

Greetings,
Stephan



On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga  
wrote:
Hi Stefan,

 Thanks a million for your detailed explanation. I appreciate it.

 -  The zookeeper bundled with kafka 0.9.0.1 was used to start zookeeper. 
There is only 1 instance (standalone) of zookeeper running on my localhost 
(ubuntu 14.04)
 -  There is only 1 Kafka broker (version: 0.9.0.1 )

 With regards to Flink cluster there's only 1 JM & 2 TMs started with no 
HA. I presume this does not use zookeeper anyways as it runs as standalone 
cluster.

 
 BTW., The kafka connector version that I use is as suggested in the flink 
connectors page.
   
              org.apache.flink
              flink-connector-kafka-0.9_2.10
              1.1.1
        
 
 Do you see any issues with versions?
   
 1) Do you have benchmarks wrt., to checkpointing in flink?

     2) There isn't detailed explanation on what states are stored as part of 
the checkpointing process. For ex.,  If I have pipeline like source -> map -> 
keyBy -> map -> sink, my assumption on what's stored is:
 a) The source stream's custom watermarked records
 b) Intermediate states of each of the transformations in the pipeline
 c) Delta of Records stored from the previous sink
 d) Custom States (SayValueState as in my case) - Essentially this is 
what I bother about storing.
 e) All of my operators

  Is my understanding right?

 3) Is there a way in Flink to checkpoint only d) as stated above

 4) Can you apply checkpointing to only streams and certain operators (say 
I wish to store aggregated values part of the transformation)

Best Regards
CVP


On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen  wrote:
Thanks, the logs were very helpful!

TL:DR - The offset committing to ZooKeeper is very slow and prevents proper 
starting of checkpoints.

Here is what is happening in detail:

  - Between the point when the TaskManager receives the "trigger checkpoint" 
message and when the point when the KafkaSource actually starts the checkpoint 
is a long time (many seconds) - for one of the Kafka Inputs (the other is fine).
  - The only way this delayed can be introduced is if another checkpoint 
related operation (such as trigger() or notifyComplete() ) is still in progress 
when the checkpoint is started. Flink does not perform concurrent checkpoint 
operations on a single operator, to ease the concurrency model for users.
  - The operation that is still in progress must be the committing of the 
offsets (to ZooKeeper or Kafka). That also explains why this only happens once 
one side receives the first record. Before that, there is nothing to commit.


What Flink should fix:
  - The KafkaConsumer should run the commit operations asynchronously, to not 
block the "notifyCheckpointComplete()" method.

What you can fix:
  - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works well, the 
other does not. Do they go against different sets of brokers, or different 
ZooKeepers? Is the metadata for one input bad?
  - In the next Flink version, you may opt-out of committing offsets to 
Kafka/ZooKeeper all together. It is not important for Flink's checkpoints 
anyways.

Greetings,
Stephan


On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga  
wrote:
Hi Stefan,

    Please find my responses below.


Re: Using Flink

2016-10-04 Thread Tzu-Li (Gordon) Tai
Hi Govindarajan,

Regarding the stagnant Kakfa offsets, it’ll be helpful if you can supply more 
information for the following to help us identify the cause:
1. What is your checkpointing interval set to?
2. Did you happen to have set the “max.partition.fetch.bytes” property in the 
properties given to FlinkKafkaConsumer? I’m suspecting with some recent changes 
to the offset committing, large fetches can also affect when offsets are 
committed to Kafka.
3. I’m assuming that you’ve built the Kafka connector from source. Could you 
tell which commit it was built on?

If you could, you can also reply with the taskmanager logs (or via private 
email) so we can check in detail, that would definitely be helpful!

Best Regards,
Gordon


On October 4, 2016 at 3:51:59 PM, Till Rohrmann (trohrm...@apache.org) wrote:

Hi Govindarajan,

you can broadcast the stream with debug logger information by calling 
`stream.broadcast`. Then every stream record should be send to all sub-tasks of 
the downstream operator.

Cheers,
Till

On Mon, Oct 3, 2016 at 5:13 PM, Govindarajan Srinivasaraghavan 
 wrote:
Hi Gordon,

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

I am using 1.2-SNAPSHOT
'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version: 
'1.2-SNAPSHOT'
'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version: 
'1.2-SNAPSHOT'

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translate your original logging-trigger api requests to a stream of events fed 
to Flink. This stream of events will then basically be changes of your user 
logger behaviour, and your operators can change its logging behaviour according 
to this stream.

I can send the changes as streams, but I need this change for all the operators 
in my pipeline. Instead of using coflatmap at each operator to combine the 
streams, is there a way to send a change to all the operators?

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?
I don’t think this is possible.
Fine, thanks.

Thanks.

On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai  
wrote:
Hi!

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translat

Re: Using Flink

2016-10-03 Thread Tzu-Li (Gordon) Tai
Hi!

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to 
the offset committing for Kafka 0.9 consumer, so identifying the exact commit 
will help clarify whether the recent change introduced any new problems. Also, 
what is your configured checkpoint interval? When checkpointing is enabled, the 
Kafka consumer only commits to Kafka when checkpoints are completed. So, 
offsets in Kafka are not updated until the next checkpoint is triggered.

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will 
need to change configuration / behaviour of a specific Flink operator at 
runtime. To my knowledge, the best way to do this in Flink right now is to 
translate your original logging-trigger api requests to a stream of events fed 
to Flink. This stream of events will then basically be changes of your user 
logger behaviour, and your operators can change its logging behaviour according 
to this stream.

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?

I don’t think this is possible.


Best Regards,
Gordon


On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan 
(govindragh...@gmail.com) wrote:

Hi,

 

I have few questions on how I need to model my use case in flink. Please 
advise. Thanks for the help.

 

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and 
I have checkpoint enabled. When I look at the consumer offsets in kafka it 
appears to be stagnant and there is a huge lag. But I can see my flink program 
is in pace with kafka source in JMX metrics and outputs. Is there a way to 
identify why the offsets are not committed to kafka?

 

- In my current application we custom loggers for debugging purposes. Let’s say 
we want to find what’s happening for a particular user, we fire an api request 
to add the custom logger for that particular user and use it for logging along 
the data path. Is there a way to achieve this in flink? Are there any global 
mutable parameters that I can use to achieve this functionality?

 

- Can I pass on state between operators? If I need the state stored on previous 
operators, how can I fetch it?

 

Thanks

Re: FlinkKafkaConsumer and Kafka topic/partition change

2016-09-27 Thread Tzu-Li (Gordon) Tai
Hi!

This is definitely a planned feature for the Kafka connectors, there’s a JIRA 
exactly for this [1].
We’re currently going through some blocking tasks to make this happen, I also 
hope to speed up things over there :)

Your observation is correct that the Kaka consumer uses “assign()” instead of 
“subscribe()”.
This is due to the fact that the partition-to-subtask assignment needs to be 
determinate in Flink
for exactly-once semantics.
If you’re not concerned about exactly-once and want to experiment around for 
now before [1] comes around,
I believe Robert has recently implemented a Kafka consumer that uses 
“subscribe()”, so the Kafka
topics can scale (looping in Robert to provide more info about this one).

Best Regards,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4022


On September 27, 2016 at 6:17:06 PM, Hironori Ogibayashi (ogibaya...@gmail.com) 
wrote:

Hello,  

I want FlinkKafkaConsumer to follow changes in Kafka topic/partition change.  
This means:  
- When we add partitions to a topic, we want FlinkKafkaConsumer to  
start reading added partitions.  
- We want to specify topics by pattern (e.g accesslog.*), and want  
FlinkKafkaConsumer to start reading new topics if they appeared after  
starting job.  

As long as reading source code and my experiment, FlinkKafkaConsumer  
uses KafkaConsumer.assign() instead of subscribe(), so partitions are  
assigned to each KafkaConsumer instance just once at job starting  
time.  

Is there any way to let FlinkKafkaConsumer follow topic/partition change?  

Regards,  
Hironori Ogibayashi  


Re: setting the name of a subtask ?

2016-09-12 Thread Tzu-Li (Gordon) Tai
Hi!

Yes, you can set custom operator names by calling `.name(…)` on DataStreams 
after a transformation.
For example, `.addSource(…).map(...).name(…)`. This name will be used for 
visualization on the dashboard, and also for logging.

Regards,
Gordon


On September 12, 2016 at 3:44:58 PM, Bart van Deenen 
(bartvandee...@fastmail.fm) wrote:

Hi all  

I'm using Flink 1.1 with a streaming job, consisting of a few maps and a  
few aggregations.  
In the web dashboard for the job I see subtask names like:  

TriggerWindow(SlidingEventTimeWindows(60, 5000),  
FoldingStateDescriptor{serializer=null, initialValue=Res(0,List()),  
foldFunction=org.apache.flink.streaming.api.scala.function.util.ScalaFoldFunction@5c42d2b7},
  
EventTimeTrigger(), WindowedStream.fold(WindowedStream.java:238)) ->  
Filter -> Map  

Is it possible to give this a more human readable name from my job  
program?  

Greetings  

Bart van Deenen  


Re: Kafka SimpleStringConsumer NPE

2016-09-04 Thread Tzu-Li (Gordon) Tai
Hi David,

Is it possible that your Kafka installation is an older version than 0.9? Or 
you may have used a different Kafka client major version in your job jar's 
dependency?
This seems like an odd incompatible protocol with the Kafka broker to me, as 
the client in the Kafka consumer is reading null record bytes.

Regards,
Gordon


On September 4, 2016 at 7:17:04 AM, dbciar (da...@dbciar.co.uk) wrote:

Hello Everyone, 

I was wondering if anyone could help shed light on where I have introduced 
an error into my code to get the following error: 

java.lang.NullPointerException 
at java.lang.String.(String.java:556) 
at 
org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:34)
 
at 
org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:27)
 
at 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
 
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
 
at java.lang.Thread.run(Thread.java:745) 

I get this error while running a job that connects to kafka from a local 
deployment. Could it be to do with how I'm packaging the Jar before 
uploading it to the cluster? 

The job plan is created and deployed OK via the management website, but as 
soon as data is added to Kafka I get the above and the job stops. Using 
Kafka's own console consumer script, I validated the kafka queue and the 
data looks exactly like the testing data I used when reading from local 
files. 

Any help as always appreciated, 
Cheers, 
David 



-- 
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-SimpleStringConsumer-NPE-tp.html
 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com. 


Re: Wikiedit QuickStart with Kinesis

2016-09-01 Thread Tzu-Li (Gordon) Tai
Hi Craig,

I’ve just run a simple test on this and there should be no problem.

What Flink version were you using (the archetype version used with the Flink 
Quickstart Maven Archetype)?
Also, on which branch / commit was the Kinesis connector built? Seeing that 
you’ve used the “AUTO”
credentials provider option, I’m assuming it’s built on the master branch and 
not a release branch (the “AUTO”
option wasn’t included in any of the release branches yet).

So I’m suspecting it’s due to a version conflict between the two. If yes, you 
should build the Kinesis connector
with the same release version branch as the Flink version you’re using.
Could you check and see if the problem remains? Thanks!

Regards,
Gordon


On September 1, 2016 at 1:34:19 AM, Foster, Craig (foscr...@amazon.com) wrote:

Hi:

I am using the following WikiEdit example:

https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html

 

It works when printing the contents to a file or stdout.

 

But I wanted to modify it to use Kinesis instead of Kafka. So instead of the 
Kafka part, I put:

 

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

FlinkKinesisProducer kinesis = new FlinkKinesisProducer<>(new 
SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("my-flink-stream");
kinesis.setDefaultPartition("0");

result
.map(new MapFunction, String>() {
    @Override
    public String map(Tuple2 tuple) {
    return tuple.toString();
    }
})
.addSink(kinesis);

see.execute();

 

 

But I get the following error:

2016-08-31 17:05:41,541 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph    - Source: Custom 
Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to 
CANCELING
2016-08-31 17:05:41,542 INFO  org.apache.flink.yarn.YarnJobManager  
    - Status of job 43a13707d92da260827f37968597c187 () changed to 
FAILING.
java.lang.Exception: Serialized representation of 
org.apache.flink.streaming.runtime.tasks.TimerException: 
java.lang.RuntimeException: Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Searching Google doesn't yield many things that seem to work. Is there 
somewhere I should look for a root cause? I looked in the full log file but 
it's not much more than this stacktrace.

Re: Kinesis connector - Iterator expired exception

2016-08-26 Thread Tzu-Li (Gordon) Tai
Hi Josh,

Thanks for the description. From your description and a check into the code, 
I’m suspecting what could be happening is that before the consumer caught up to 
the head of the stream, Kinesis was somehow returning the same shard iterator 
on consecutive fetch calls, and the consumer kept on using the same one until 
it eventually timed out.

This is actually suggesting the cause is due to Kinesis-side unexpected 
behaviour, so I probably need to run some long-running tests to clarify / 
reproduce this. The constant "15 minute" fail is suggesting this too, because 
the expire time for shard iterators is actually 5 minutes (from the Kinesis 
docs) …

Either way, it should be possible to handle this in the consumer so that it 
doesn’t fail on such situations. I’ve filed up a JIRA for this: 
https://issues.apache.org/jira/browse/FLINK-4514 .
I’ll get back to you after I figure out the root cause ;)

Regards,
Gordon


On August 26, 2016 at 10:43:02 PM, Josh (jof...@gmail.com) wrote:

Hi Gordon,

My job only went down for around 2-3 hours, and I'm using the default Kinesis 
retention of 24 hours. When I restored the job, it got this exception after 
around 15 minutes (and then restarted again, and got the same exception 15 
minutes later etc) - but actually I found that after this happened around 5 
times the job fully caught up to the head of the stream and started running 
smoothly again.

Thanks for looking into this!

Best,
Josh


On Fri, Aug 26, 2016 at 1:57 PM, Tzu-Li (Gordon) Tai  
wrote:
Hi Josh,

Thank you for reporting this, I’m looking into it. There was some major changes 
to the Kinesis connector after mid June, but the changes don’t seem to be 
related to the iterator timeout, so it may be a bug that had always been there.

I’m not sure yet if it may be related, but may I ask how long was your Flink 
job down before restarting it again from the existing state? Was it longer than 
the retention duration of the Kinesis records (default is 24 hours)?

Regards,
Gordon


On August 26, 2016 at 7:20:59 PM, Josh (jof...@gmail.com) wrote:

Hi all,

I guess this is probably a question for Gordon - I've been using the 
Flink-Kinesis connector for a while now and seen this exception a couple of 
times:

com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
expired. The iterator was created at time Fri Aug 26 10:47:47 UTC 2016 while 
right now it is Fri Aug 26 11:05:40 UTC 2016 which is further in the future 
than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
d3db1d90-df97-912b-83e1-3954e766bbe0)

It happens when my Flink job goes down for a couple of hours, then I restart 
from the existing state and it needs to catch up on all the data that has been 
put in Kinesis stream in the hours where the job was down. The job then runs 
for ~15 mins and fails with this exception (and this happens repeatedly - 
meaning I can't restore the job from the existing state).

Any ideas what's causing this? It's possible that it's been fixed in recent 
commits, as the version of the Kinesis connector I'm using is behind master - 
I'm not sure exactly what commit I'm using (doh!) but it was built around mid 
June.

Thanks,
Josh



Re: Kinesis connector - Iterator expired exception

2016-08-26 Thread Tzu-Li (Gordon) Tai
Hi Josh,

Thank you for reporting this, I’m looking into it. There was some major changes 
to the Kinesis connector after mid June, but the changes don’t seem to be 
related to the iterator timeout, so it may be a bug that had always been there.

I’m not sure yet if it may be related, but may I ask how long was your Flink 
job down before restarting it again from the existing state? Was it longer than 
the retention duration of the Kinesis records (default is 24 hours)?

Regards,
Gordon


On August 26, 2016 at 7:20:59 PM, Josh (jof...@gmail.com) wrote:

Hi all,

I guess this is probably a question for Gordon - I've been using the 
Flink-Kinesis connector for a while now and seen this exception a couple of 
times:

com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
expired. The iterator was created at time Fri Aug 26 10:47:47 UTC 2016 while 
right now it is Fri Aug 26 11:05:40 UTC 2016 which is further in the future 
than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
d3db1d90-df97-912b-83e1-3954e766bbe0)

It happens when my Flink job goes down for a couple of hours, then I restart 
from the existing state and it needs to catch up on all the data that has been 
put in Kinesis stream in the hours where the job was down. The job then runs 
for ~15 mins and fails with this exception (and this happens repeatedly - 
meaning I can't restore the job from the existing state).

Any ideas what's causing this? It's possible that it's been fixed in recent 
commits, as the version of the Kinesis connector I'm using is behind master - 
I'm not sure exactly what commit I'm using (doh!) but it was built around mid 
June.

Thanks,
Josh

Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi Sameer,

I realized you might be a bit confused between “source instances (which in
general are Flink tasks)” and “threads” in my previous explanations. The
per-broker threads in the Kafka consumer and per-shard threads in the
Kinesis consumer I mentioned are threads created by the source instance’s
main thread. So, they have nothing to do with the assignment of
shard/partitions to the source instances. The threading models previously
explained refers to how a single source instance consumes multiple
shards/partitions that are assigned to it.

Hope this clarifies things for you more :)

Regards,
Gordon


On August 23, 2016 at 9:31:58 PM, Tzu-Li (Gordon) Tai (tzuli...@gmail.com)
wrote:

Slight misunderstanding here. The one thread per Kafka broker happens
*after* the assignment of Kafka partitions to the source instances. So,
with a total of 10 partitions and 10 source instances, each source instance
will first be assigned 1 partition. Then, each source instance will create
1 thread for every individual broker that holds partitions that the source
instance is assigned. The per-broker threading model of the Kafka consumer
has nothing to do with the initial assignment of partitions to source
instances.

Another example to explain this more clearly:
Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
parallelism 5. Each source instance will still have 2 partitions. If the
2 partitions belong to the same broker, the source instance will have only
1 consuming threads; otherwise if the 2 partitions belong to different
brokers, the source instance will have 2 consuming threads.

Regards,
Gordon


On August 23, 2016 at 8:47:15 PM, Sameer W (sam...@axiomine.com) wrote:

Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Slight misunderstanding here. The one thread per Kafka broker happens
*after* the assignment of Kafka partitions to the source instances. So,
with a total of 10 partitions and 10 source instances, each source instance
will first be assigned 1 partition. Then, each source instance will create
1 thread for every individual broker that holds partitions that the source
instance is assigned. The per-broker threading model of the Kafka consumer
has nothing to do with the initial assignment of partitions to source
instances.

Another example to explain this more clearly:
Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
parallelism 5. Each source instance will still have 2 partitions. If the
2 partitions belong to the same broker, the source instance will have only
1 consuming threads; otherwise if the 2 partitions belong to different
brokers, the source instance will have 2 consuming threads.

Regards,
Gordon


On August 23, 2016 at 8:47:15 PM, Sameer W (sam...@axiomine.com) wrote:

Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Tzu-Li (Gordon) Tai
No, it does not default to Ingestion Time. For other connectors in general,
you have to explicitly call `assignTimestampAndWatermarks()` before the
first operator in the topology that works on time (ex. windows), otherwise
the job will fail as soon as records start incoming.

Currently, I think only the Kinesis connector and, shortly in the future,
Kafka 0.10 connector will have default timestamps when the topology uses
Event Time. Otherwise, the behaviour is described as above.

Regards,
Gordon


On August 23, 2016 at 7:34:25 PM, Sameer W (sam...@axiomine.com) wrote:

Thanks - Is there also a default behavior for non Kinesis streams? If I set
the time characteristics as Event Time but do not assign timestamps or
generate watermarks by invoking the assignTimestampsAndWatermarks
function, does
that default to using Ingestion time. Or in other words is it like I
invoking this method on the source stream-

assignTimestampsAndWatermarks(new IngestionTimeExtractor<>())

Sameer

On Tue, Aug 23, 2016 at 7:29 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> For the Kinesis consumer, when you use Event Time but do not explicitly
> assign timestamps, the Kinesis server-side timestamp (the time which
> Kinesis received the record) is attached to the record as default, not
> Flink’s ingestion time.
>
> Does this answer your question?
>
> Regards,
> Gordon
>
>
> On August 23, 2016 at 6:42:26 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> If you do not explicitly assign timestamps and watermarks when using Event
> Time, does it automatically default to using Ingestion Time?
>
> I was reading the Kinesis integration section and came across the note
> below and which raised the above question. I saw another place where you
> explicitly use Event Time with ingestion time with the following - .
> assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.
>
> Does that line have to called explicitly or is it the default?
>
>
> "If streaming topologies choose to use the event time notion
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html>
>  for
> record timestamps, an *approximate arrival timestamp* will be used by
> default. This timestamp is attached to records by Kinesis once they were
> successfully received and stored by streams. Note that this timestamp is
> typically referred to as a Kinesis server-side timestamp, and there are no
> guarantees about the accuracy or order correctness (i.e., the timestamps
> may not always be ascending)."
>
> Thanks,
> Sameer
>
>


Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi,

For the Kinesis consumer, when you use Event Time but do not explicitly
assign timestamps, the Kinesis server-side timestamp (the time which
Kinesis received the record) is attached to the record as default, not
Flink’s ingestion time.

Does this answer your question?

Regards,
Gordon


On August 23, 2016 at 6:42:26 PM, Sameer W (sam...@axiomine.com) wrote:

Hi,

If you do not explicitly assign timestamps and watermarks when using Event
Time, does it automatically default to using Ingestion Time?

I was reading the Kinesis integration section and came across the note
below and which raised the above question. I saw another place where you
explicitly use Event Time with ingestion time with the following
- .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.

Does that line have to called explicitly or is it the default?


"If streaming topologies choose to use the event time notion

for
record timestamps, an *approximate arrival timestamp* will be used by
default. This timestamp is attached to records by Kinesis once they were
successfully received and stored by streams. Note that this timestamp is
typically referred to as a Kinesis server-side timestamp, and there are no
guarantees about the accuracy or order correctness (i.e., the timestamps
may not always be ascending)."

Thanks,
Sameer


Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi!

Kinesis shards should be ideally evenly assigned to the source instances.
So, with your example of source parallelism of 10 and 20 shards, each
source instance will have 2 shards and will have 2 threads consuming them
(therefore, not in round robin).

For the Kafka consumer, in the source instances there will be one consuming
thread per broker, instead of partition. So, if a source instance is
assigned partitions that happen to be on the same broker, the source
instance will only create 1 thread to consume all of them.

You are correct that currently the Kafka consumer does not handle
repartitioning transparently like the Kinesis connector, but we’re working
on this :)

Regards,
Gordon

On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:

Hi,

The documentation says that there will be one thread per shard. If I my
streaming job runs with a parallelism of 10 and there are 20 shards, are
more threads going to be launched within  a task slot running a source
function to consume the additional shards or will one source function
instance consume 2 shards in round robin.

Is it any different for Kafka? Based on the documentation my understanding
is that if there are 10 source function instances and 20 partitions, each
one will read 2 partitions.

Also if partitions are added to Kafka are they handled by the existing
streaming job or does it need to be restarted? It appears as though Kinesis
handles it via the consumer constantly checking for more shards.

Thanks,
Sameer


Re: How to get latest offsets with FlinkKafkaConsumer

2016-08-05 Thread Tzu-Li (Gordon) Tai
Hi,

Please also note that the “auto.offset.reset” property is only respected
when there is no offsets under the same consumer group in ZK. So,
currently, in order to make sure you read from the latest / earliest
offsets every time you restart your Flink application, you’d have to use an
unique groupId on each restart.

We’re currently working on new configuration for the Kafka consumer to
explicitly configure the starting offset / position without respecting
existing offsets in ZK. You can follow the corresponding JIRA here:
https://issues.apache.org/jira/browse/FLINK-4280.

Regards,
Gordon

On August 5, 2016 at 8:47:32 PM, Stefan Richter (s.rich...@data-artisans.com)
wrote:

Sorry, I think you are actually asking for the largest offset in the Kafka
source, which makes it setProperty("auto.offset.reset", "largest").

Am 05.08.2016 um 14:44 schrieb Stefan Richter :

Hi,

I think passing properties with setProperty("auto.offset.reset",
"smallest“) to the Kafka consumer should do what you want.

Best,
Stefan


Am 05.08.2016 um 14:36 schrieb Mao, Wei :

I am doing some performance tests with Flink (1.0.3 )+ Kafka (0.8.2.2). And
I noticed that when I restarted my Flink application, it reads records
starting from the latest offset that I consumed last time, but not the
latest offsets of that topic in Kafka.

So Is there any way to make it read from last offsets of broker/MyTopic
instead of consumer/MyTopic in Flink?

Thanks,
William


Re: API request to submit job takes over 1hr

2016-06-13 Thread Tzu-Li (Gordon) Tai
Hi Shannon,

Thanks for your investigation on the issue and the JIRA. There's actually a
previous JIRA on this problem already:
https://issues.apache.org/jira/browse/FLINK-4023. Would you be ok with
tracking this issue on FLINK-4023, and close FLINK-4069 as a duplicate
issue? As you can see, I've also referenced a link to FLINK-4069 on
FLINK-4023 for your additional info on the problem.

A little help with answering your last questions:
1. We're doing the partition distribution across consumers ourselves: the
Kafka consumer connector creates a Kafka client on subtasks, and each
subtask independently determines which partitions it should be in charge of.
There's also information on this blog here for more info:
http://data-artisans.com/kafka-flink-a-practical-how-to/, on the last FAQ
section. As Robert has mentioned, the consumer is currently depending on the
fixed ordered list of partitions sent to all subtasks so that each of them
always determine the same set of partitions to fetch from across restarts.
2. Following the above description, currently the consumer is only
subscribing to the fixed partition list queried in the constructor. So at
the moment the Flink Kafka consumer doesn't handle repartitioning of topics,
but it's definitely on the todo list for the Kafka connector and won't be
too hard to implement once querying in the consumer is resolved (perhaps
Robert can clarify this a bit more).

Best,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-tp7319p7558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: S3 as streaming source

2016-06-03 Thread Tzu-Li (Gordon) Tai
Hi,

I've gave it a quick test and Chiwan is right. The methods `readFile`,
`readFileStream`, `readTextFile` on StreamExecutionEnvironment works with
the S3 scheme to stream from S3 objects.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-as-streaming-source-tp7357p7361.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: S3 as streaming source

2016-06-02 Thread Tzu-Li (Gordon) Tai
Hi Soumya,

No, currently there is no Flink standard supported S3 streaming source. As
far as I know, there isn't one out in the public yet either. The community
is open to submissions for new connectors, so if you happen to be working on
one for S3, you can file up a JIRA to let us know.

Also, are you looking for a S3 streaming source that fetches S3 event
notifications (ref:
http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html), or
streaming files / objects from S3 for a data stream program? I assume the
first one, since otherwise writing Flink batch jobs will suit you more (the
batch DataSet API already supports this).



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-as-streaming-source-tp7357p7358.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Local Cluster have problem with connect to elasticsearch

2016-05-11 Thread Tzu-Li (Gordon) Tai
Hi Rafal,

>From your description, it seems like Flink is complaining because it cannot
access the Elasticsearch API related dependencies as well. You'd also have
to include the following into your Maven build, under :


org.elasticsearch
elasticsearch
2.3.2
jar
false
${project.build.directory}/classes
org/elasticsearch/**


Now your built jar should correctly include all required dependencies (the
connector & Elasticsearch API).

As explained in  Linking with modules not contained in the binary
distribution

 
, it will be enough to package dependencies along with your code for Flink
to access all required dependencies, and you wouldn't need to copy the jar
to the lib folder. I would recommend to clean up the lib folder of the
previous jars you copied, and follow this approach in the future, just in
case they mess up the classloader.

As with your first attempt that Flink cannot find any Elasticsearch nodes
when executed in the IDE, I'm suspecting the reason is that the
elasticsearch2 connector by default uses version 2.2.1, lower than your
cluster version 2.3.2. I had previous experience when Elasticsearch
strangely complains not finding any nodes when using lower client versions
than the deployment. Can you try compiling the elasticsearch2 connector with
the option -Delasticsearch.version=2.3.2, and use the newly build connector
jar, following the same method mentioned above?

Hope this helps!

Cheers,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink DataStream and KeyBy

2016-01-13 Thread Tzu-Li (Gordon) Tai
Hi Saiph,

In Flink, the key for keyBy() can be provided in different ways:
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#specifying-keys
(the doc is for DataSet API, but specifying keys is basically the same for
DataStream and DataSet).

As described in the documentation, calls like keyBy(0) are meant for Tuples,
so it only works for DataStream[Tuple]. Other key definition types like
keyBy(new KeySelector() {...}) can basically take any DataStream of
arbitrary data type. Flink finds out whether or not there is a conflict
between the type of the data in the DataStream and the way the key is
defined at runtime.

Hope this helps!

Cheers,
Gordon





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-DataStream-and-KeyBy-tp4271p4272.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpoint for exact-once proccessing

2016-01-13 Thread Tzu-Li (Gordon) Tai
Hi Francis,

A part of every complete snapshot is the record positions associated with
the barrier that triggered the checkpointing of this snapshot. The snapshot
is completed only when all the records within the checkpoint reaches the
sink. When a topology fails, all the operators' state will fall back to the
latest complete snapshot (incomplete snapshots will be ignored). The data
source will also fall back to the position recorded with this snapshot, so
even if there are repeatedly read data records after the restore, the
restored operator's state are also clean of the records effect. This way,
Flink guarantees exactly-once effects of each record on every operator's
state. The user functions in operators need not to be implemented
idempotent.

Hope this helps answer your question!

Cheers,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-for-exact-once-proccessing-tp4261p4264.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink with Yarn

2016-01-11 Thread Tzu-Li (Gordon) Tai
Hi Sourav,

A little help with more clarification on your last comment.

In sense of "where" the driver program is executed, then yes the Flink
driver program runs in a mode similar to Spark's YARN-client.

However, the "role" of the driver program and the work that it is
responsible of is quite different between Flink and Spark. In Spark, the
driver program is in charge of coordinating Spark workers (executors) and
must listen for and accept incoming connections from the workers throughout
the job's lifetime. Therefore, in Spark's YARN-client mode, you must keep
the driver program process alive otherwise the job will be shutdown.

However, in Flink, the coordination of Flink TaskManagers to complete a job
is handled by Flink's JobManager once the client at the driver program
submits the job to the JobManager. The driver program is solely used for the
job submission and can disconnect afterwards. 

Like what Stephan explained, if the user-defined dataflow defines any
intermediate results to be retrieved via collect() or print(), the results
are transmitted through the JobManager. Only then does the driver program
need to stay connected. Note that this connection still does not need to
have any connections with the workers (Flink TaskManagers), only the
JobManager.

Cheers,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-with-Yarn-tp4224p4227.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kinesis Connector

2016-01-08 Thread Tzu-Li (Gordon) Tai
Hi Giancarlo,

Since it has been a while since the last post and there hasn't been a JIRA
ticket opened for Kinesis connector yet, I'm wondering how you are doing on
the Kinesis connector and hope to help out with this feature :)

I've opened a JIRA (https://issues.apache.org/jira/browse/FLINK-3211),
finished the Kinesis sink, and half way through the Kinesis consumer. Would
you like to merge our current efforts so that we can complete this feature
ASAP for the AWS user community?

Thankfully,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-tp2872p4206.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


<    1   2   3   4   5   6