Re:Re: ElasticsearchSink on DataSet

2017-05-08 Thread Tzu-Li (Gordon) Tai
Hi!

Thanks for sharing that repo! I think that would be quite an useful 
contribution to Flink for the users, if you’re up to preparing a PR for it :)

It also looks like you’ve adopted most of the current ElasticsearchSink APIs 
(RequestIndexer, ElasticsearchSinkFunction, etc.) for the 
ElasticsearchOutputFormat, which is nice to fit into the current code :-D

Cheers,
Gordon


On 9 May 2017 at 1:05:14 PM, wyphao.2007 (wyphao.2...@163.com) wrote:

Hi Flavio

Maybe this is what you want: 
https://github.com/397090770/flink-elasticsearch2-connector, It can save Flink 
DataSet to elasticsearch.
import scala.collection.JavaConversions._
val config = Map("bulk.flush.max.actions" -> "1000", "cluster.name" -> 
"elasticsearch")val hosts = "www.iteblog.com"val transports = 
hosts.split(",").map(host => new InetSocketAddress(InetAddress.getByName(host), 
9300)).toListval data : DataSet[String] = 
data.output(new ElasticSearchOutputFormat(config, transports, new 
ElasticsearchSinkFunction[String] {  def createIndexRequest(element: 
String): IndexRequest = {
Requests.indexRequest.index("iteblog").`type`("info").source(element)
  }  override def process(element: String, ctx: RuntimeContext, 
indexer: RequestIndexer) {
indexer.add(createIndexRequest(element))
  }
}))

I hope this could help you

在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"写道:

Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to 
implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, 
which generally speaking at the moment has no bridge or unification yet with 
the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet that I write 
to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from 
Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio

Re: Writing a reliable Flink source for a NON-replay-able queue/protocol that supports message ACKs

2017-05-08 Thread Tzu-Li (Gordon) Tai
Hi Martin!

Let me try to follow-up some of your questions :)

a. When the acknowledgeIDs method is called, is it certain that all the rest of 
the operators, including the Sinks finished successfully? E.g: If I have a sink 
that writes to MySQL/Cassandra and one that writes to SQS/Kafka, will the 
writes to both of these systems have been completed successfully before 
acknowledgeIDs is called?
That is correct. The `MessageAcknowledgingSourceBase#acknowledgeIDs` is 
basically wrapped within a `notifyCheckpointComplete()` call. Checkpoints are 
only notified to be completed when all sinks have finished their snapshot for 
the checkpoint.

For sinks like Cassandra and Kafka, currently what the snapshot method does is 
flush all in-flight pending requests to write to the external system. So yes, 
you can be sure that the writes for the acknowledged IDs have been completed by 
all sinks.

b. Messages can be duplicated in case the processing takes longer than the 
queue timeout or if there are failures and Flink needs to recover. This is a 
problem for sinks that write to non-idempotent systems e.g. SQS, Kafka. What 
are the recommended approaches to avoid this duplication? Are there any example 
repos?
Duplication is a general problem for non-idempotent pipeline setups, and is not 
easy to avoid when the external sink does not support transactions. I’m not 
aware of any cookbook solutions for this.

a. Is there some better api for fine grained killing of various services, 
tasks, resources in a Flink cluster or job?
b. Can you point me to a repo with reliability tests for Flink i.e. where 
things are killed to see whether the system recovers etc?
The Flink Kafka consumer actually has quite a few tests for exactly-once 
guarantees. You can take a look at them here [1]. Specifically, take a look at 
the `testOneToOneSources`, `testOneSourceMultiplePartitions`, etc. tests. I 
think they are quite good examples of how to write tests for exactly-once 
testing.



- Gordon

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java



On 8 May 2017 at 1:08:44 PM, Martin Eden (martineden...@gmail.com) wrote:

Hi Kostas,

Thanks for pointing me in the right direction.

I have gone and extended MessageAcknowledgingSourceBase. It was quite easy to 
do.

I have however some follow-up questions about the guarantees it gives and 
testing my solution.

1. Guarantees:

Questions:
a. When the acknowledgeIDs method is called, is it certain that all the rest of 
the operators, including the Sinks finished successfully? E.g: If I have a sink 
that writes to MySQL/Cassandra and one that writes to SQS/Kafka, will the 
writes to both of these systems have been completed successfully before 
acknowledgeIDs is called?
b. Messages can be duplicated in case the processing takes longer than the 
queue timeout or if there are failures and Flink needs to recover. This is a 
problem for sinks that write to non-idempotent systems e.g. SQS, Kafka. What 
are the recommended approaches to avoid this duplication? Are there any example 
repos?

2. Testing:

Work done so far:
In order to convince myself that indeed this source is reliable, I wrote some 
integration tests. I used LocalFlinkMiniCluster which is quite nice. However 
when I tried to test what happens when I kill the TaskManager running the task 
that is executing my MessageAcknowledgingSourceBase I found it not so straight 
forward. I managed to get around it by starting the cluster and the job, 
getting all the task manager actor references, adding a new task manager to the 
cluster, killing the initial task managers by sending a poison pill actor msg. 
I had to kill all the initial task managers as I did not find a way to get a 
mapping between task running the source and the task manager actor to which it 
got assigned.

Questions:
a. Is there some better api for fine grained killing of various services, 
tasks, resources in a Flink cluster or job?
b. Can you point me to a repo with reliability tests for Flink i.e. where 
things are killed to see whether the system recovers etc?

Thanks,
M


On Tue, Apr 25, 2017 at 9:23 AM, Kostas Kloudas  
wrote:
Hi Martin!

For an example of a source that acknowledges received messages, you could check 
the MessageAcknowledgingSourceBase
and the MultipleIdsMessageAcknowledgingSourceBase that ship with Flink. I hope 
this will give you some ideas.

Now for the Flink version on top of which to implement your source, I would 
suggest the Flink 1.3. The reason is that it will
come out soon (~1 month) and it will include a lot of new features and 
bug-fixes. Until then, it may change a bit, but the APIs
that you will be using, will not change.

So why not going straight for the more future-proof way?

Thanks,
Kostas

> On Apr 24, 2017, at 11:20 PM, Martin Eden  wrote:
>
> Hi everyone,

Re: Job ID

2017-05-08 Thread Tzu-Li (Gordon) Tai
Hi Joe,

AFAIK, this currently isn’t possible through the DataStream API.
You’ll be able to get a JobExecutionResult which contains the job id from the 
execute() call, but that’s blocked until the streaming job finishes.

There are plans for a new DataStream client that allows querying job info (job 
id, current accumulator results, etc.), perhaps that is something you’re 
looking for?
As of now, I think querying the REST interface would be the way to go.

Cheers,
Gordon

On 9 May 2017 at 11:47:43 AM, Joe Olson (jo4...@outlook.com) wrote:

I've got a job name, and need the job id. Is there a way to get this via the 
java API? I know I can get it via the rest interface. Is there an equivalent 
API call in the streaming API?

If not, I'll continue to use the rest interface.

Re:Re: ElasticsearchSink on DataSet

2017-05-08 Thread wyphao.2007
Hi Flavio


Maybe this is what you want: 
https://github.com/397090770/flink-elasticsearch2-connector, It can save Flink 
DataSet to elasticsearch.
importscala.collection.JavaConversions._
valconfig=Map("bulk.flush.max.actions"->"1000", 
"cluster.name"->"elasticsearch")valhosts="www.iteblog.com"valtransports= 
hosts.split(",").map(host =>newInetSocketAddress(InetAddress.getByName(host), 
9300)).toListvaldata:DataSet[String] = 
data.output(newElasticSearchOutputFormat(config, transports, 
newElasticsearchSinkFunction[String] {  defcreateIndexRequest(element: 
String):IndexRequest= {
Requests.indexRequest.index("iteblog").`type`("info").source(element)
  }  overridedefprocess(element: String, ctx: RuntimeContext, indexer: 
RequestIndexer) {
indexer.add(createIndexRequest(element))
  }
}))


I hope this could help you


在2017年05月09 12时59分, "Tzu-Li (Gordon) Tai"写道:


Hi Flavio,


I don’t think there is a bridge class for this. At the moment you’ll have to 
implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, 
which generally speaking at the moment has no bridge or unification yet with 
the DataSet API.


Cheers,
Gordon





On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:



Hi to all,
at the moment I have a Flink Job that generates a DataSet that I write 
to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from 
Flink but ElasticsearchSink only works with streaming environment.



Is there any bridge class for this?


Best,
Flavio

Re: Flink + Kafka + avro example

2017-05-08 Thread Tzu-Li (Gordon) Tai
Thanks a lot for sharing this Flavio!


On 5 May 2017 at 10:45:38 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi to all Flink users,
we've just published on our Okkam public repository an example of using Flink 
1.2.1 + Kafka 0.10 to exchange Avro objects[1].

We hope this could be helpful for new Flink users willing to play with Flink 
streaming.

[1] 
https://github.com/okkam-it/flink-examples/blob/master/src/main/java/org/okkam/flink/KafkaFlinkAvroParquet.java

Best,
Flavio

Re: ElasticsearchSink on DataSet

2017-05-08 Thread Tzu-Li (Gordon) Tai
Hi Flavio,

I don’t think there is a bridge class for this. At the moment you’ll have to 
implement your own OutputFormat.
The ElasticsearchSink is a SinkFunction which is part of the DataStream API, 
which generally speaking at the moment has no bridge or unification yet with 
the DataSet API.

Cheers,
Gordon


On 3 May 2017 at 10:15:51 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:


Hi to all,
at the moment I have a Flink Job that generates a DataSet that I write 
to a File that is read by Logstash to index data on ES.
I'd like to use the new ElasticsearchSink to index those JSON directly from 
Flink but ElasticsearchSink only works with streaming environment.

Is there any bridge class for this?

Best,
Flavio

Job ID

2017-05-08 Thread Joe Olson
I've got a job name, and need the job id. Is there a way to get this via the 
java API? I know I can get it via the rest interface. Is there an equivalent 
API call in the streaming API?

If not, I'll continue to use the rest interface.


Re: [DISCUSS] Release 1.3.0 RC0 (Non voting, testing release candidate)

2017-05-08 Thread Renjie Liu
Hi, does this include the FLIP6?

On Tue, May 9, 2017 at 2:29 AM Stephan Ewen  wrote:

> Did a quick test: Simply adding the
> "org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"
> helps with NOTICE files,
> but does not add the required BSD licence copies.
>
>
> On Mon, May 8, 2017 at 8:25 PM, Stephan Ewen  wrote:
>
>> I did the first pass for the legal check.
>>
>>   - Source LICENSE and NOTICE are okay
>>
>>   - In the shaded JAR files, we are not bundling the license and notice
>> files of the dependencies we include in the shaded jars.
>>  => Not a problem for Guava (Apache Licensed)
>>  => I think is a problem for ASM (redistribution in binary form,
>> hence needs a notice of the copy)
>>
>>   - The Table API / SQL module needs more entries for Janino /
>> Reflections (both BSD licensed)
>>
>> So that is definitely a blocker.
>>
>>
>> On Mon, May 8, 2017 at 12:14 PM, Robert Metzger 
>> wrote:
>>
>>> Hi Devs,
>>>
>>> I've created a first non-voting release candidate for Flink 1.3.0.
>>> Please use this RC to test as much as you can and provide feedback to
>>> the Flink community. The more we find and fix now, the better Flink 1.3.0
>>> wil be :)
>>>
>>> I've CC'ed the user@ mailing list to get more people to test it. DO NOT
>>> USE THIS RELEASE CANDIDATE IN PRODUCTION.
>>>
>>> I will prepare a google document to synchronize the testing effort a bit
>>> more.
>>>
>>> Depending on the number of issues we identify, I hope that we can do the
>>> first VOTEing RC early next week.
>>>
>>> -
>>>
>>> The release commit is f94c002991dcce9f1104f8e61b43efb2f8247cb4, located
>>> here: http://git-wip-us.apache.org/repos/asf/flink/commit/f94c0029
>>>
>>> The artifacts are located here:
>>> http://people.apache.org/~rmetzger/flink-1.3.0-rc0/
>>>
>>> The maven staging repository is here:
>>> https://repository.apache.org/content/repositories/orgapacheflink-1118
>>>
>>> -
>>>
>>> Happy testing!
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>
> --
Liu, Renjie
Software Engineer, MVAD


Joining on multiple row values produced by TableFunction

2017-05-08 Thread Samuel Doyle
I want to do something like the following

.join("fields(fields) as (name, content)")
.where("text = 'password for user' && name='text' &&
!content.like('%accepted%') && name='appname' && content.like('%hostd%')")

Fields collects 4 rows in this case which contain those values

This doesn't work with flink 1.2. Is there a way to accomplish this?
…

-- 

Sent from my phone


AsyncCollector Does not release the thread (1.2.1)

2017-05-08 Thread Steve Robert
Hi guys,

AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
This scenario may be problematic when calling an external API
In the case of a timeout error there is no data to collect.

for example :

  CompletableFuture.supplyAsync(() -> asyncCallTask(input))
.thenAccept((Collection> result) -> {

this.tupleEmited.getAndIncrement();

asyncCollector.collect(result);
})
.exceptionally((ex) -> {
asyncCollector.collect(ex);
return null;
});
}

it is possible to create an empty Collection and collect this empty
collection to force the Thread to be released but this workflow seems
strange to me.
thank for your help


-- 
Steve Robert 
Software Engineer
srob...@qualys.com
T
Qualys, Inc. – Continuous Security
Blog  | Community  |
Twitter 



Re: Question on checkpoint management

2017-05-08 Thread Stefan Richter
I think this jira is helpful for your question: 
https://issues.apache.org/jira/browse/FLINK-6328 


> Am 08.05.2017 um 19:33 schrieb Cliff Resnick :
> 
> When a job cancel-with-savepoint finishes a successful Savepoint, the 
> preceding last successful Checkpoint is removed. Is this the intended 
> behavior? I thought that checkpoints and savepoints were separate entities 
> and, as such, savepoints should not infringe on checkpoints. This is actually 
> an issue for us because we have seen occurrences of false-positive successful 
> savepoints, perhaps due to S3 latency. Bottom line, we'd like to treat 
> savepoints as insurance rather than the critical path and would rather they 
> be oblivious to checkpoint management.
> 
> We are using externalized checkpoints, which may be confusing things. Also I 
> know checkpoint management is undergoing some changes in Flink 1.3 (we are on 
> Flink 1.2.0). Any insight is greatly appreciated.
> 
> 



Re: [DISCUSS] Release 1.3.0 RC0 (Non voting, testing release candidate)

2017-05-08 Thread Stephan Ewen
Did a quick test: Simply adding the
"org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"
helps with NOTICE files,
but does not add the required BSD licence copies.


On Mon, May 8, 2017 at 8:25 PM, Stephan Ewen  wrote:

> I did the first pass for the legal check.
>
>   - Source LICENSE and NOTICE are okay
>
>   - In the shaded JAR files, we are not bundling the license and notice
> files of the dependencies we include in the shaded jars.
>  => Not a problem for Guava (Apache Licensed)
>  => I think is a problem for ASM (redistribution in binary form, hence
> needs a notice of the copy)
>
>   - The Table API / SQL module needs more entries for Janino / Reflections
> (both BSD licensed)
>
> So that is definitely a blocker.
>
>
> On Mon, May 8, 2017 at 12:14 PM, Robert Metzger 
> wrote:
>
>> Hi Devs,
>>
>> I've created a first non-voting release candidate for Flink 1.3.0.
>> Please use this RC to test as much as you can and provide feedback to the
>> Flink community. The more we find and fix now, the better Flink 1.3.0 wil
>> be :)
>>
>> I've CC'ed the user@ mailing list to get more people to test it. DO NOT
>> USE THIS RELEASE CANDIDATE IN PRODUCTION.
>>
>> I will prepare a google document to synchronize the testing effort a bit
>> more.
>>
>> Depending on the number of issues we identify, I hope that we can do the
>> first VOTEing RC early next week.
>>
>> -
>>
>> The release commit is f94c002991dcce9f1104f8e61b43efb2f8247cb4, located
>> here: http://git-wip-us.apache.org/repos/asf/flink/commit/f94c0029
>>
>> The artifacts are located here: http://people.apache.org
>> /~rmetzger/flink-1.3.0-rc0/
>>
>> The maven staging repository is here: https://repository.apach
>> e.org/content/repositories/orgapacheflink-1118
>>
>> -
>>
>> Happy testing!
>>
>> Regards,
>> Robert
>>
>>
>


Re: [DISCUSS] Release 1.3.0 RC0 (Non voting, testing release candidate)

2017-05-08 Thread Stephan Ewen
I did the first pass for the legal check.

  - Source LICENSE and NOTICE are okay

  - In the shaded JAR files, we are not bundling the license and notice
files of the dependencies we include in the shaded jars.
 => Not a problem for Guava (Apache Licensed)
 => I think is a problem for ASM (redistribution in binary form, hence
needs a notice of the copy)

  - The Table API / SQL module needs more entries for Janino / Reflections
(both BSD licensed)

So that is definitely a blocker.


On Mon, May 8, 2017 at 12:14 PM, Robert Metzger  wrote:

> Hi Devs,
>
> I've created a first non-voting release candidate for Flink 1.3.0.
> Please use this RC to test as much as you can and provide feedback to the
> Flink community. The more we find and fix now, the better Flink 1.3.0 wil
> be :)
>
> I've CC'ed the user@ mailing list to get more people to test it. DO NOT
> USE THIS RELEASE CANDIDATE IN PRODUCTION.
>
> I will prepare a google document to synchronize the testing effort a bit
> more.
>
> Depending on the number of issues we identify, I hope that we can do the
> first VOTEing RC early next week.
>
> -
>
> The release commit is f94c002991dcce9f1104f8e61b43efb2f8247cb4, located
> here: http://git-wip-us.apache.org/repos/asf/flink/commit/f94c0029
>
> The artifacts are located here: http://people.apache.
> org/~rmetzger/flink-1.3.0-rc0/
>
> The maven staging repository is here: https://repository.
> apache.org/content/repositories/orgapacheflink-1118
>
> -
>
> Happy testing!
>
> Regards,
> Robert
>
>


Re: RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null

2017-05-08 Thread Kaepke, Marc
Hi,

did some had an answer or solution?

Best
Marc

Am 05.05.2017 um 20:05 schrieb Kaepke, Marc 
>:

Hi everyone,

what does mean that following exception, if I run my gelly program?

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: null
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at 
org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:203)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at 
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: null
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230)
at java.util.ArrayList$SubList.size(ArrayList.java:1040)
at java.util.AbstractList.add(AbstractList.java:108)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Process finished with exit code 1
—

During an iteration (GatherFunction) I will update a custom class


if(!allNewSemiClusters.isEmpty()) {
for(SemiCluster semiCluster : allNewSemiClusters) {
   existingSemiClusters.add(semiCluster);
}

I don’t get the error above, if I comment out the second to last line.


Any ideas?


Best,
Marc





Question on checkpoint management

2017-05-08 Thread Cliff Resnick
When a job cancel-with-savepoint finishes a successful Savepoint, the
preceding last successful Checkpoint is removed. Is this the intended
behavior? I thought that checkpoints and savepoints were separate entities
and, as such, savepoints should not infringe on checkpoints. This is
actually an issue for us because we have seen occurrences of false-positive
successful savepoints, perhaps due to S3 latency. Bottom line, we'd like to
treat savepoints as insurance rather than the critical path and would
rather they be oblivious to checkpoint management.

We are using externalized checkpoints, which may be confusing things. Also
I know checkpoint management is undergoing some changes in Flink 1.3 (we
are on Flink 1.2.0). Any insight is greatly appreciated.


Re: Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-08 Thread Aljoscha Krettek
It seems that eventTime is a static field in TopicPojo and the key selector 
also just gets the static field via TopicPojo.getEventTime(). Why is that? 
Because with this the event time basically has nothing to do with the data.

> On 5. May 2017, at 10:32, G.S.Vijay Raajaa  wrote:
> 
> I tried the timestamp field as a string datatype as well as a Date object. 
> Getting same error in both the cases;
> 
> Please find the POJO file:
> 
> import java.text.DateFormat;
> 
> import java.text.ParseException;
> 
> import java.text.SimpleDateFormat;
> 
> import java.util.Date;
> 
> import java.util.HashMap;
> 
> import java.util.List;
> 
> import java.util.Map;
> 
> import com.fasterxml.jackson.annotation.JsonAnyGetter;
> 
> import com.fasterxml.jackson.annotation.JsonAnySetter;
> 
> import com.fasterxml.jackson.annotation.JsonFormat;
> 
> import com.fasterxml.jackson.annotation.JsonIgnore;
> 
> import com.fasterxml.jackson.annotation.JsonInclude;
> 
> import com.fasterxml.jackson.annotation.JsonProperty;
> 
> import com.fasterxml.jackson.annotation.JsonPropertyOrder;
> 
> import org.apache.commons.lang.builder.ToStringBuilder;
> 
> 
> 
> @JsonPropertyOrder({
> 
> "data",
> 
> "label",
> 
> "eventTime"
> 
> })
> 
> public class TopicPojo {
> 
> 
> 
> @JsonProperty("data")
> 
> private List data = null;
> 
> @JsonProperty("label")
> 
> private List label = null;
> 
> @JsonProperty("eventTime")
> 
> private  static  Date eventTime;
> 
> 
> 
> /**
> 
> * No args constructor for use in serialization
> 
> * 
> 
> */
> 
> public TopicPojo() {
> 
> }
> 
> 
> 
> /**
> 
> * 
> 
> * @param data
> 
> * @param label
> 
> * @param eventTime
> 
> */
> 
> public SammonsPojo(List data, List label, Date 
> eventTime) {
> 
> super();
> 
> this.data = data;
> 
> this.label = label;
> 
> this.eventTime = eventTime;
> 
> }
> 
> 
> 
> @JsonProperty("data")
> 
> public List getData() {
> 
> return data;
> 
> }
> 
> 
> 
> @JsonProperty("data")
> 
> public void setData(List data) {
> 
> this.data = data;
> 
> }
> 
> 
> 
> @JsonProperty("label")
> 
> public List getLabel() {
> 
> return label;
> 
> }
> 
> 
> 
> @JsonProperty("label")
> 
> public void setLabel(List label) {
> 
> this.label = label;
> 
> }
> 
> 
> 
> @JsonProperty("eventTime")
> 
> public static Date getEventTime() {
> 
> return eventTime;
> 
> }
> 
> 
> 
> @JsonProperty("eventTime")
> 
> public void setEventTime(Date eventTime) {
> 
> this.eventTime = eventTime;
> 
> }
> 
> 
> 
> @Override
> 
> public String toString() {
> 
> return ToStringBuilder.reflectionToString(this);
> 
> }
> 
> 
> 
> }
> 
> 
> 
> The above code pertains to eventTime as Date object , tried them as String as 
> well.
> 
> Regards,
> 
> Vijay Raajaa G S
> 
> 
> On Fri, May 5, 2017 at 1:59 PM, Aljoscha Krettek  > wrote:
> What’s the KeySelector you’re using? To me, this indicates that the timestamp 
> field is somehow changing after the original keying or in transit.
> 
> Best.
> Aljoscha
>> On 4. May 2017, at 22:01, G.S.Vijay Raajaa > > wrote:
>> 
>> I tried to reorder and the window function works fine. but then after 
>> processing few stream of data from Topic A and Topic B, the window function 
>> seem to throw the below error. The keyby is on eventTime field.
>> 
>> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
>> 
>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>> 
>> at 
>> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>> 
>> at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>> 
>> at org.apache.flink.streaming.runtime.io 
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>> 
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>> 
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>> 
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>> 
>> at java.lang.Thread.run(Thread.java:745)
>> 
>> 
>> 
>> Regards,
>> 
>> Vijay Raajaa GS 
>> 
>> 
>> On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa > > wrote:
>> Thanks for your input, will try to incorporate them in my implementation.
>> 
>> Regards,
>> Vijay Raajaa G S
>> 
>> On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek > > wrote:
>> The approach could work, but if it can happen that an event from stream A is 
>> not matched by an event in stream B you will have lingering state that never 
>> goes away. For such cases it might be better to write a custom 
>> CoProcessFunction as sketched here: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

Re: High Availability on Yarn

2017-05-08 Thread Jain, Ankit
Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen 
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org" , "Jain, Ankit" 

Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally create a second one when 
there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any 
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink 
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
> wrote:
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

On 5. May 2017, at 16:56, Jain, Ankit 
> wrote:

Thanks for the update Aljoscha.

@Till Rohrmann,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek >
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" >
Cc: "user@flink.apache.org" 
>, Till Rohrmann 
>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous running containers.  
So, where does Zookeeper sit in this setup?

Thanks
Ankit

From: Aljoscha Krettek >
Date: Wednesday, May 3, 2017 at 2:05 AM
To: "Jain, Ankit" >
Cc: "user@flink.apache.org" 
>, Till Rohrmann 
>
Subject: Re: High Availability on Yarn

Hi,
As a first comment, the work mentioned in the FLIP-6 doc you linked is still 
work-in-progress. You cannot use these abstractions yet without going into the 
code and setting up a cluster “by hand”.

The documentation for one-step deployment of a Job to YARN is available here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn

Regarding your third question, ZooKeeper is mostly used for discovery and 
leader election. That is, JobManagers use it to decide who is the main JM and 
who are standby JMs. TaskManagers use it to discover the leading JobManager 
that they should connect to.

I’m also cc’ing Till, who should know this stuff better and can maybe explain 
it in a bit more detail.

Best,
Aljoscha
On 1. May 2017, at 18:59, Jain, Ankit 
> wrote:

Hi fellow users,
We are trying to straighten out high availability story for flink.

Our setup includes a long running EMR cluster, job submission is a two-step 
process – 1) Flink 

Re: Stopping the job with ID XXX failed.

2017-05-08 Thread Stefan Richter
Hi,

what you intend to do is cancel in Flink terminology, not stop. So you should 
use the cancel command instead of the stop. Please take a look here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cli.html 
 .

Best,
Stefan

> Am 08.05.2017 um 14:02 schrieb yunfan123 :
> 
> I can't stop the job, every time the exception like follows.
> 
> Retrieving JobManager.
> Using address /10.6.192.141:6123 to connect to JobManager.
> 
> 
> The program finished with the following exception:
> 
> java.lang.Exception: Stopping the job with ID
> 7f5a5f95353a2b486572f4cdefa813b8 failed.
>   at org.apache.flink.client.CliFrontend.stop(CliFrontend.java:528)
>   at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1087)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1126)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
>   at
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1123)
> Caused by: java.lang.IllegalStateException: Job with ID
> 7f5a5f95353a2b486572f4cdefa813b8 is not stoppable.
>   at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:657)
>   at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>   at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stopping-the-job-with-ID-XXX-failed-tp13046.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: High Availability on Yarn

2017-05-08 Thread Stephan Ewen
@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one
JobManager in the normal case, Yarn can accidentally create a second one
when there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
wrote:

> Hi,
> Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.
>
> Best,
> Aljoscha
>
> On 5. May 2017, at 16:56, Jain, Ankit  wrote:
>
> Thanks for the update Aljoscha.
>
> @Till Rohrmann ,
> Can you please chim in?
>
> Also, we currently have a long running EMR cluster where we create one
> flink cluster per job – can we just choose to install Zookeeper when
> creating the EMR cluster and use one Zookeeper instance for ALL of flink
> jobs?
> Or
> Recommendation is to have a dedicated Zookeeper instance per flink job?
>
> Thanks
> Ankit
>
> *From: *Aljoscha Krettek 
> *Date: *Thursday, May 4, 2017 at 1:19 AM
> *To: *"Jain, Ankit" 
> *Cc: *"user@flink.apache.org" , Till Rohrmann <
> trohrm...@apache.org>
> *Subject: *Re: High Availability on Yarn
>
> Hi,
> Yes, for YARN there is only one running JobManager. As far as I Know, In
> this case ZooKeeper is only used to keep track of checkpoint metadata and
> the execution graph of the running job. Such that a restoring JobManager
> can pick up the data again.
>
> I’m not 100 % sure on this, though, so maybe Till can shed some light on
> this.
>
> Best,
> Aljoscha
>
> On 3. May 2017, at 16:58, Jain, Ankit  wrote:
>
> Thanks for your reply Aljoscha.
>
> After building better understanding of Yarn and spending copious amount of
> time on Flink codebase, I think I now get how Flink & Yarn interact – I
> plan to document this soon in case it could help somebody starting afresh
> with Flink-Yarn.
>
> Regarding Zookeper, in YARN mode there is only one JobManager running, do
> we still need leader election?
>
> If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn
> RM and while restarting, Flink AM will bring back previous running
> containers.  So, where does Zookeeper sit in this setup?
>
> Thanks
> Ankit
>
> *From: *Aljoscha Krettek 
> *Date: *Wednesday, May 3, 2017 at 2:05 AM
> *To: *"Jain, Ankit" 
> *Cc: *"user@flink.apache.org" , Till Rohrmann <
> trohrm...@apache.org>
> *Subject: *Re: High Availability on Yarn
>
> Hi,
> As a first comment, the work mentioned in the FLIP-6 doc you linked is
> still work-in-progress. You cannot use these abstractions yet without going
> into the code and setting up a cluster “by hand”.
>
> The documentation for one-step deployment of a Job to YARN is available
> here: https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn
> 
>
> Regarding your third question, ZooKeeper is mostly used for discovery and
> leader election. That is, JobManagers use it to decide who is the main JM
> and who are standby JMs. TaskManagers use it to discover the leading
> JobManager that they should connect to.
>
> I’m also cc’ing Till, who should know this stuff better and can maybe
> explain it in a bit more detail.
>
> Best,
> Aljoscha
>
> On 1. May 2017, at 18:59, Jain, Ankit  wrote:
>
> Hi fellow users,
> We are trying to straighten out high availability story for flink.
>
> Our setup includes a long running EMR cluster, job submission is a
> two-step process – 1) Flink cluster is first created using flink yarn
> client on the EMR cluster already running 2) Flink job is submitted.
>
> I also saw references that with 1.2, these two steps have been combined
> into 1 – is that change in FlinkYarnSessionCli.java? Can somebody point to
> documentation please?
>
> W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly
> introduced) failure for now, I want to understand first how task manager &
> job manager failures are handled.
>
> My questions-
> 1)   https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=65147077
> 
>  suggests a 

Re: AllWindowed vs Windowed with 1 key

2017-05-08 Thread Adrienne Kole
Hi,

Thanks for the reply. So I have 2 cases:

1. timeWindowAll (length, slide).reduce (...) (with parallelism = 1)
2. groupby(someField).timeWindow(length, slide). reduce(...)

Lets say case-1 global window, case-2 partitioned window. If I have only
one key (for case-2) and I set parallelism=1  for case-1, I would expect
that both cases have similar performance both in terms of latency and
throughput. However, partitioned windows outperform global ones by orders
of magnitude in terms of throughput.
I am using Flink 1.1.3.


Thanks,
Adrienne




On Mon, May 8, 2017 at 3:55 PM, Stefan Richter 
wrote:

> Hi,
>
> to answer this question, we would first need to know what you mean by
> „global windows“: using „windowAll()“ or „GlobalWindows“? Also, the answer
> might depend on the Flink version that you are using.
>
> Best,
> Stefan
>
> > Am 07.05.2017 um 23:23 schrieb Adrienne Kole :
> >
> > Hi,
> >
> > I am doing simple aggregation with a keyed and global windows in flink.
> > When I compare the keyed window aggregation with 1 key and global window
> (which has parallelism 1) I would expect that both of them would have
> similar performance.
> >
> > However, keyed stream with 1 key performs with 2x more throughput than
> global window.
> > My configuration is 8 node cluster, 16 core in each node, parallelism =
> 128.
> >
> > AFAIK, Flink doesn't manage skew by default and uses hash function to
> assign keys to partitions. So if I have 1 key only, it should go to only
> one partition always, which is semantically similar to global windows in
> flink.
> >
> > What can be the reason behind this difference in performance?
> >
> > Thanks,
> > Adrienne
>
>


Re: AllWindowed vs Windowed with 1 key

2017-05-08 Thread Stefan Richter
Hi,

to answer this question, we would first need to know what you mean by „global 
windows“: using „windowAll()“ or „GlobalWindows“? Also, the answer might depend 
on the Flink version that you are using.

Best,
Stefan

> Am 07.05.2017 um 23:23 schrieb Adrienne Kole :
> 
> Hi,
> 
> I am doing simple aggregation with a keyed and global windows in flink. 
> When I compare the keyed window aggregation with 1 key and global window 
> (which has parallelism 1) I would expect that both of them would have similar 
> performance. 
> 
> However, keyed stream with 1 key performs with 2x more throughput than global 
> window. 
> My configuration is 8 node cluster, 16 core in each node, parallelism = 128.
> 
> AFAIK, Flink doesn't manage skew by default and uses hash function to assign 
> keys to partitions. So if I have 1 key only, it should go to only one 
> partition always, which is semantically similar to global windows in flink.
> 
> What can be the reason behind this difference in performance?
> 
> Thanks,
> Adrienne



Stopping the job with ID XXX failed.

2017-05-08 Thread yunfan123
I can't stop the job, every time the exception like follows.

Retrieving JobManager.
Using address /10.6.192.141:6123 to connect to JobManager.


 The program finished with the following exception:

java.lang.Exception: Stopping the job with ID
7f5a5f95353a2b486572f4cdefa813b8 failed.
at org.apache.flink.client.CliFrontend.stop(CliFrontend.java:528)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1087)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1126)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1123)
Caused by: java.lang.IllegalStateException: Job with ID
7f5a5f95353a2b486572f4cdefa813b8 is not stoppable.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:657)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stopping-the-job-with-ID-XXX-failed-tp13046.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: High Availability on Yarn

2017-05-08 Thread Aljoscha Krettek
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

> On 5. May 2017, at 16:56, Jain, Ankit  wrote:
> 
> Thanks for the update Aljoscha.
>  
> @Till Rohrmann ,
> Can you please chim in?
>  
> Also, we currently have a long running EMR cluster where we create one flink 
> cluster per job – can we just choose to install Zookeeper when creating the 
> EMR cluster and use one Zookeeper instance for ALL of flink jobs?
> Or
> Recommendation is to have a dedicated Zookeeper instance per flink job?
>  
> Thanks
> Ankit
>  
> From: Aljoscha Krettek 
> Date: Thursday, May 4, 2017 at 1:19 AM
> To: "Jain, Ankit" 
> Cc: "user@flink.apache.org" , Till Rohrmann 
> 
> Subject: Re: High Availability on Yarn
>  
> Hi, 
> Yes, for YARN there is only one running JobManager. As far as I Know, In this 
> case ZooKeeper is only used to keep track of checkpoint metadata and the 
> execution graph of the running job. Such that a restoring JobManager can pick 
> up the data again.
>  
> I’m not 100 % sure on this, though, so maybe Till can shed some light on this.
>  
> Best,
> Aljoscha
> On 3. May 2017, at 16:58, Jain, Ankit  > wrote:
>  
> Thanks for your reply Aljoscha.
>  
> After building better understanding of Yarn and spending copious amount of 
> time on Flink codebase, I think I now get how Flink & Yarn interact – I plan 
> to document this soon in case it could help somebody starting afresh with 
> Flink-Yarn.
>  
> Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
> still need leader election?
>  
> If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
> and while restarting, Flink AM will bring back previous running containers.  
> So, where does Zookeeper sit in this setup?
>  
> Thanks
> Ankit
>  
> From: Aljoscha Krettek >
> Date: Wednesday, May 3, 2017 at 2:05 AM
> To: "Jain, Ankit" >
> Cc: "user@flink.apache.org " 
> >, Till Rohrmann 
> >
> Subject: Re: High Availability on Yarn
>  
> Hi, 
> As a first comment, the work mentioned in the FLIP-6 doc you linked is still 
> work-in-progress. You cannot use these abstractions yet without going into 
> the code and setting up a cluster “by hand”.
>  
> The documentation for one-step deployment of a Job to YARN is available here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn
>  
> 
>  
> Regarding your third question, ZooKeeper is mostly used for discovery and 
> leader election. That is, JobManagers use it to decide who is the main JM and 
> who are standby JMs. TaskManagers use it to discover the leading JobManager 
> that they should connect to.
>  
> I’m also cc’ing Till, who should know this stuff better and can maybe explain 
> it in a bit more detail.
>  
> Best,
> Aljoscha
> On 1. May 2017, at 18:59, Jain, Ankit  > wrote:
>  
> Hi fellow users,
> We are trying to straighten out high availability story for flink.
>  
> Our setup includes a long running EMR cluster, job submission is a two-step 
> process – 1) Flink cluster is first created using flink yarn client on the 
> EMR cluster already running 2) Flink job is submitted.
>  
> I also saw references that with 1.2, these two steps have been combined into 
> 1 – is that change in FlinkYarnSessionCli.java? Can somebody point to 
> documentation please?
>  
> W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly 
> introduced) failure for now, I want to understand first how task manager & 
> job manager failures are handled.
>  
> My questions-
> 1)   
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 
> 
>  suggests a new RM has been added and now there is one JobManager for each 
> job. Since Yarn RM will now talk to Flink RM( instead of JobManager 
> previously), will Yarn automatically restart failing Flink RM?
> 2)   Is there any documentation on 

Re: Can ValueState use generics?

2017-05-08 Thread Stephan Ewen
Please use

"new ValueStateDescriptor<>("mystate", TypeInformation.of(new TypeHint
>(){}));

That should work...

On Mon, May 8, 2017 at 1:11 PM, Chesnay Schepler  wrote:

> If you want to use generics you have to either provide a TypeInformation
> instead of a class or create a class that extends Tuple2(Integer,
> ObjectNode) and use it as the class argument.
>
>
> On 07.05.2017 15:14, yunfan123 wrote:
>
>> My process function is like :
>>
>>  private static class MergeFunction extends
>> RichProcessFunction, Tuple2> ObjectNode>> {
>>
>>  private ValueState> state;
>>
>>  @Override
>>  @SuppressWarnings("unchecked")
>>  public void open(Configuration parameters) throws Exception {
>>  state = getRuntimeContext().getState(new
>> ValueStateDescriptor<>("mystate",
>>  (Class>)
>> (Object)Tuple2.class));
>>  }
>> }
>>
>>
>> When I running the code:
>> 05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1)
>> switched
>> to FAILED
>> java.lang.RuntimeException: Cannot create full type information based on
>> the
>> given class. If the type has generics, please
>> at
>> org.apache.flink.api.common.state.StateDescriptor.(Sta
>> teDescriptor.java:124)
>> at
>> org.apache.flink.api.common.state.ValueStateDescriptor.> >(ValueStateDescriptor.java:101)
>> at
>> com.bytedance.flinkjob.activationSource.AppActivationSource$
>> MergeFunction.open(AppActivationSource.java:134)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:112)
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator.ope
>> n(ProcessOperator.java:55)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:375)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:251)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
>> Tuple needs to be parameterized by using generics.
>> at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createType
>> InfoWithTypeHierarchy(TypeExtractor.java:673)
>> at
>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCre
>> ateTypeInfo(TypeExtractor.java:607)
>> at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createType
>> Info(TypeExtractor.java:561)
>> at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createType
>> Info(TypeExtractor.java:557)
>> at
>> org.apache.flink.api.common.state.StateDescriptor.(Sta
>> teDescriptor.java:122)
>> ... 9 more
>>
>> Can I use generics with ValueState?
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Can-ValueState-use-
>> generics-tp13038.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>
>


Re: Can ValueState use generics?

2017-05-08 Thread Chesnay Schepler
If you want to use generics you have to either provide a TypeInformation 
instead of a class or create a class that extends Tuple2(Integer, 
ObjectNode) and use it as the class argument.


On 07.05.2017 15:14, yunfan123 wrote:

My process function is like :

 private static class MergeFunction extends
RichProcessFunction, Tuple2> {

 private ValueState> state;

 @Override
 @SuppressWarnings("unchecked")
 public void open(Configuration parameters) throws Exception {
 state = getRuntimeContext().getState(new
ValueStateDescriptor<>("mystate",
 (Class>)
(Object)Tuple2.class));
 }
}


When I running the code:
05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched
to FAILED
java.lang.RuntimeException: Cannot create full type information based on the
given class. If the type has generics, please
at
org.apache.flink.api.common.state.StateDescriptor.(StateDescriptor.java:124)
at
org.apache.flink.api.common.state.ValueStateDescriptor.(ValueStateDescriptor.java:101)
at
com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Tuple needs to be parameterized by using generics.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557)
at
org.apache.flink.api.common.state.StateDescriptor.(StateDescriptor.java:122)
... 9 more

Can I use generics with ValueState?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





[DISCUSS] Release 1.3.0 RC0 (Non voting, testing release candidate)

2017-05-08 Thread Robert Metzger
Hi Devs,

I've created a first non-voting release candidate for Flink 1.3.0.
Please use this RC to test as much as you can and provide feedback to the
Flink community. The more we find and fix now, the better Flink 1.3.0 wil
be :)

I've CC'ed the user@ mailing list to get more people to test it. DO NOT USE
THIS RELEASE CANDIDATE IN PRODUCTION.

I will prepare a google document to synchronize the testing effort a bit
more.

Depending on the number of issues we identify, I hope that we can do the
first VOTEing RC early next week.

-

The release commit is f94c002991dcce9f1104f8e61b43efb2f8247cb4, located
here: http://git-wip-us.apache.org/repos/asf/flink/commit/f94c0029

The artifacts are located here:
http://people.apache.org/~rmetzger/flink-1.3.0-rc0/

The maven staging repository is here:
https://repository.apache.org/content/repositories/orgapacheflink-1118

-

Happy testing!

Regards,
Robert


Re: Kafka 0.10 jaas multiple clients

2017-05-08 Thread Tzu-Li (Gordon) Tai
Hi Gwenhael,

Sorry for the very long delayed response on this.

As you noticed, the “KafkaClient” entry name seems to be a hardcoded thing on 
the Kafka side, so currently I don’t think what you’re asking for is possible.

It seems like this could be made possible with some of the new authentication 
features in Kafka 0.10 that seems related: [1] [2].

I’m not that deep into the authentication modules, but I’ll take a look and can 
keep you posted on this.
Also looping in Eron (in CC) who could perhaps provide more insight on this at 
the same time.

Cheers,
Gordon

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-83+-+Allow+multiple+SASL+authenticated+Java+clients+in+a+single+JVM+process
[2] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients

On 26 April 2017 at 8:48:20 PM, Gwenhael Pasquiers 
(gwenhael.pasqui...@ericsson.com) wrote:

Hello,

Up to now we’ve been using kafka with jaas (plain login/password) the following 
way:

-  yarnship the jaas file

-  add the jaas file name into “flink-conf.yaml” using property 
“env.java.opts”

 

How to support multiple secured kafka 0.10 consumers and producers (with 
different logins and password of course) ?

From what I saw in the kafka sources, the entry name “KafkaClient” is hardcoded…

Best Regards,

 

Gwenhaël PASQUIERS