How can I confirm a savepoint is used for a new job?

2018-03-21 Thread Hao Sun
Do we have any logs in JM/TM indicate the job is using a savepoint I passed
in when I submit the job?

Thanks


how does SQL mode work with PopularPlaces example?

2018-03-21 Thread James Yu
Hi,

I am following the PopularPlacesSQL example (
http://training.data-artisans.com/exercises/popularPlacesSql.html), but I
am unable to understand why the following statement will pickup events with
START flag only.

"SELECT " +
"toCoords(cell), wstart, wend, isStart, popCnt " +
"FROM " +
"(SELECT " +
"cell, " +
"isStart, " +
"HOP_START(eventTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wstart,
" +
"HOP_END(eventTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wend, " +
"COUNT(isStart) AS popCnt " +
"FROM " +
"(SELECT " +
"eventTime, " +
"isStart, " +
"CASE WHEN isStart THEN toCellId(startLon, startLat) ELSE toCellId(endLon,
endLat) END AS cell " +
"FROM TaxiRides " +
"WHERE isInNYC(startLon, startLat) AND isInNYC(endLon, endLat)) " +
"GROUP BY cell, isStart, HOP(eventTime, INTERVAL '5' MINUTE, INTERVAL '15'
MINUTE)) " +
"WHERE popCnt > 20"

Since we can update state in processElement when we do it with low level
ProcessFunction, how does SQL rule out the un-paired events?


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275


Re: entrypoint for executing job in task manager

2018-03-21 Thread Steven Wu
Thanks, let me clarify the requirement. Sorry that it wasn't clear in the
original email.

Here is our setup.

these 3 dirs are added to classpath
* flink/lib: core flink jars (like flink-dist_2.11,
flink-shaded-hadoop2-uber)
* spaaslib: many jars pulled in our internal platform
* jobs: a single fat jar for app/job code lives here

running Flink
* "jobmanager.web.upload.dir" is configured to use the "jobs" dir above.
* we use REST api to submit job in a standalone cluster.

Here are the requirements for two level of init hooks
1) provide *JVM init hook* in JobManager and TaskManager class during JVM
startup for user to extend. right now, we overrides the main method and
others (as you were also suggesting). that is a little fragile as it is
tight coupling. JobManager and TaskManager class don't seem to implemented
for override. we have seen breaking changes when upgrading Flink from 1.2
-> 1.3 -> 1.4
2) *job init hook* during job execution. jobmanager computes the job graph,
ship it to taskmanager, which then execute the job (open/run operators).
  - my original email is to allow user (app developer) to supply additional
Guice binding modules at taskmanager with a job init hook. then we can
create a child injector with these additional modules. but Guice child
injector has some issue with de-dup modules and run into duplicate binding
problem. so we decided to not pursue this route
  - my colleague (Monal) mentioned another use case where we can leverage
such job init hook at taskmanager. e.g. Inside job init hook, we can decide
and attach EBS volumes based on key group assignment. Our understanding is
that such key group assignment is calculated by jobmanager during job
planning.

The problem with your suggestion of "InjectionUtil.installModulesIf
NotYetInstalled()" is that it will also be loaded/executed inside
JobManager when it loads this class. It is just a minor issue though.

Thanks,
Steven


On Wed, Mar 21, 2018 at 9:47 AM, Stephan Ewen  wrote:

> It would be great to understand a bit more what the exact requirements
> here are, and what setup you use.
>
> I am not a dependency injection expert, so let me know if what I am
> suggesting here is complete bogus.
>
>
> *(1) Fix set of libraries for Dependency Injection, or dedicated container
> images per application*
>
> If you have a dedicated JM and TM Flink image that you build per job, I
> would assume that you also put all the required the libraries directly into
> the lib folder, so everything is available on startup.
>
> In that case, could you just warp the TM and JM main methods to first call
> the initialization methods to set up dependency injection?
>
> This would also work if you have container images that are not
> job-specific, but all the libraries relevant to dependency injection are
> part of the image (the lib folder).
>
> *(2) Generic container images, plus dynamic set of libraries for
> dependency injection*
>
> Assuming you do not have job-specific container images, and each
> application brings its own dependencies it wants to set up for dependency
> injection,
> we could look in the following direction.
>
> The dependencies need to be set up for each Task on the TaskManager  ,
> because each task gets potentially a dedicated classloader.
> Have you tried an approach like the following?
>
>   - Create a static dependency initializer utility class that has a static "
> installModulesIfNotYetInstalled ()" method.
>
>   - Each class that you use should have as the first line a static
> initializer block that calls that utility:
>
> public class MyFunction implements MapFunction {
>
> static {
> InjectionUtil.installModulesIfNotYetInstalled();
> }
>
> public A map(B value) {...}
>
> ...
> }
>
>
>   - You can probably create yourself a base class that does that from
> which all you functions extend.
>
>
> On Fri, Dec 22, 2017 at 11:23 AM, Piotr Nowojski 
> wrote:
>
>> I don’t think there is such hook in the Flink code now. You will have to
>> walk around this issue somehow in user space.
>>
>> Maybe you could make a contract that every operator before touching
>> Guice, should call static synchronized method `initializeGuiceContext`.
>> This method could search the classpath for classes with some specific
>> annotations, for example `@MyInitializationHook` and install/add all of
>> such hooks before actually using Guice?
>>
>> Piotrek
>>
>>
>> On 21 Dec 2017, at 17:49, Steven Wu  wrote:
>>
>> We use Guice for dependency injection. We need to install *additional*
>> Guice modules (for bindings) when setting up this static context of Guice
>> injector.
>>
>> Calling the static initializer from operator open method won't really
>> help. Not all operators are implemented by app developer who want to
>> install additional Guice modules. E.g. kafka source operator is
>> implemented/provided by our platform. I think the source operator will open
>> first, which means app operator won't get 

Re: Strange behavior on filter, group and reduce DataSets

2018-03-21 Thread Fabian Hueske
Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske :

> Hi,
>
> I've opened a pull request [1] that should fix the problem.
> It would be great if you could try change and report back whether it fixes
> the problem.
>
> Thank you,
> Fabian
>
> [1] https://github.com/apache/flink/pull/5742
>
> 2018-03-21 9:49 GMT+01:00 simone :
>
>> Hi all,
>>
>> an update: following Stephan directives on how to diagnose the issue,
>> making Person immutable, the problem does not occur.
>>
>> Simone.
>>
>> On 20/03/2018 20:20, Stephan Ewen wrote:
>>
>> To diagnose that, can you please check the following:
>>
>>   - Change the Person data type to be immutable (final fields, no
>> setters, set fields in constructor instead). Does that make the problem go
>> away?
>>
>>   - Change the Person data type to not be a POJO by adding a dummy fields
>> that is never used, but does not have a getter/setter. Does that make
>> the problem go away?
>>
>> If either of that is the case, it must be a mutability bug somewhere in
>> either accidental object reuse or accidental serializer sharing.
>>
>>
>> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske  wrote:
>>
>>> Hi Simone and Flavio,
>>>
>>> I created FLINK-9031 [1] for this issue.
>>> Please have a look and add any detail that you think could help to
>>> resolve the problem.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>>
>>> 2018-03-19 16:35 GMT+01:00 simone :
>>>
 Hi Fabian,

 This simple code reproduces the behavior ->
 https://github.com/xseris/Flink-test-union

 Thanks, Simone.

 On 19/03/2018 15:44, Fabian Hueske wrote:

 Hmmm, I still don't see the problem.
 IMO, the result should be correct for both plans. The data is
 replicated, filtered, reduced, and unioned.
 There is nothing in between the filter and reduce, that could cause
 incorrect behavior.

 The good thing is, the optimizer seems to be fine. The bad thing is, it
 is either the Flink runtime code or your functions.
 Given that one plan produces good results, it might be the Flink
 runtime code.

 Coming back to my previous question.
 Can you provide a minimal program to reproduce the issue?

 Thanks, Fabian

 2018-03-19 15:15 GMT+01:00 Fabian Hueske :

> Ah, thanks for the update!
> I'll have a look at that.
>
> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>
>> HI Simone,
>>
>> Looking at the plan, I don't see why this should be happening. The
>> pseudo code looks fine as well.
>> Any chance that you can create a minimal program to reproduce the
>> problem?
>>
>> Thanks,
>> Fabian
>>
>> 2018-03-19 12:04 GMT+01:00 simone :
>>
>>> Hi Fabian,
>>>
>>> reuse is not enabled. I attach the plan of the execution.
>>>
>>> Thanks,
>>> Simone
>>>
>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>
>>> Hi,
>>>
>>> Union is actually a very simple operator (not even an operator in
>>> Flink terms). It just merges to inputs. There is no additional logic
>>> involved.
>>> Therefore, it should also not emit records before either of both
>>> ReduceFunctions sorted its data.
>>> Once the data has been sorted for the ReduceFunction, the data is
>>> reduced and emitted in a pipelined fashion, i.e., once the first record 
>>> is
>>> reduced, it is forwarded into the MapFunction (passing the unioned 
>>> inputs).
>>> So it is not unexpected that Map starts processing before the
>>> ReduceFunction terminated.
>>>
>>> Did you enable object reuse [1]?
>>> If yes, try to disable it. If you want to reuse objects, you have to
>>> be careful in how you implement your functions.
>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>> that was generated for the program?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>
>>>
>>>
>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>>>
 Any help on this? This thing is very strange..the "manual" union of
 the output of the 2 datasets is different than the flink-union of 
 them..
 Could it be a problem of the flink optimizer?

 Best,
 Flavio

 On Fri, Mar 16, 2018 at 4:01 PM, simone <
 simone.povosca...@gmail.com> wrote:

> Sorry, I translated the code into pseudocode too fast. That is
> indeed an equals.
>
> On 16/03/2018 15:58, Kien Truong wrote:
>
> Hi,
>
> Just a guest, but string compare in Java should be using equals
> method, not == 

Re: Strange behavior on filter, group and reduce DataSets

2018-03-21 Thread Fabian Hueske
Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether it fixes
the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742

2018-03-21 9:49 GMT+01:00 simone :

> Hi all,
>
> an update: following Stephan directives on how to diagnose the issue,
> making Person immutable, the problem does not occur.
>
> Simone.
>
> On 20/03/2018 20:20, Stephan Ewen wrote:
>
> To diagnose that, can you please check the following:
>
>   - Change the Person data type to be immutable (final fields, no setters,
> set fields in constructor instead). Does that make the problem go away?
>
>   - Change the Person data type to not be a POJO by adding a dummy fields
> that is never used, but does not have a getter/setter. Does that make the
> problem go away?
>
> If either of that is the case, it must be a mutability bug somewhere in
> either accidental object reuse or accidental serializer sharing.
>
>
> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske  wrote:
>
>> Hi Simone and Flavio,
>>
>> I created FLINK-9031 [1] for this issue.
>> Please have a look and add any detail that you think could help to
>> resolve the problem.
>>
>> Thanks,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>
>> 2018-03-19 16:35 GMT+01:00 simone :
>>
>>> Hi Fabian,
>>>
>>> This simple code reproduces the behavior ->
>>> https://github.com/xseris/Flink-test-union
>>>
>>> Thanks, Simone.
>>>
>>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>>
>>> Hmmm, I still don't see the problem.
>>> IMO, the result should be correct for both plans. The data is
>>> replicated, filtered, reduced, and unioned.
>>> There is nothing in between the filter and reduce, that could cause
>>> incorrect behavior.
>>>
>>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>>> is either the Flink runtime code or your functions.
>>> Given that one plan produces good results, it might be the Flink runtime
>>> code.
>>>
>>> Coming back to my previous question.
>>> Can you provide a minimal program to reproduce the issue?
>>>
>>> Thanks, Fabian
>>>
>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske :
>>>
 Ah, thanks for the update!
 I'll have a look at that.

 2018-03-19 15:13 GMT+01:00 Fabian Hueske :

> HI Simone,
>
> Looking at the plan, I don't see why this should be happening. The
> pseudo code looks fine as well.
> Any chance that you can create a minimal program to reproduce the
> problem?
>
> Thanks,
> Fabian
>
> 2018-03-19 12:04 GMT+01:00 simone :
>
>> Hi Fabian,
>>
>> reuse is not enabled. I attach the plan of the execution.
>>
>> Thanks,
>> Simone
>>
>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>
>> Hi,
>>
>> Union is actually a very simple operator (not even an operator in
>> Flink terms). It just merges to inputs. There is no additional logic
>> involved.
>> Therefore, it should also not emit records before either of both
>> ReduceFunctions sorted its data.
>> Once the data has been sorted for the ReduceFunction, the data is
>> reduced and emitted in a pipelined fashion, i.e., once the first record 
>> is
>> reduced, it is forwarded into the MapFunction (passing the unioned 
>> inputs).
>> So it is not unexpected that Map starts processing before the
>> ReduceFunction terminated.
>>
>> Did you enable object reuse [1]?
>> If yes, try to disable it. If you want to reuse objects, you have to
>> be careful in how you implement your functions.
>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>> that was generated for the program?
>>
>> Thanks,
>> Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#operating-on-data-objects-in-functions
>>
>>
>>
>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>>
>>> Any help on this? This thing is very strange..the "manual" union of
>>> the output of the 2 datasets is different than the flink-union of them..
>>> Could it be a problem of the flink optimizer?
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Mar 16, 2018 at 4:01 PM, simone >> > wrote:
>>>
 Sorry, I translated the code into pseudocode too fast. That is
 indeed an equals.

 On 16/03/2018 15:58, Kien Truong wrote:

 Hi,

 Just a guest, but string compare in Java should be using equals
 method, not == operator.

 Regards,

 Kien


 On 3/16/2018 9:47 PM, simone wrote:

 *subject.getField("field1") == "";*



>>>
>>>
>>
>>
>

>>>
>>>
>>
>
>


InterruptedException when async function is cancelled

2018-03-21 Thread Ken Krugler
Hi all,

When I cancel a job that has async functions, I see this sequence in the 
TaskManager logs:

2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to cancel task AsyncFunctionName (1/1) 
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).
2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task 
- AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) 
switched from RUNNING to CANCELING.
2018-03-21 14:51:34,471 INFO  org.apache.flink.runtime.taskmanager.Task 
- Triggering cancellation of task code AsyncFunctionName (1/1) 
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).

And then less than a second later...

2018-03-21 14:51:35,315 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not shut 
down timer service
java.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:747)

Followed shortly thereafter by a call to the async function’s close() method, 
which logs:

2018-03-21 14:51:35,334 DEBUG com.scaleunlimited.utils.ThreadedExecutor 
 - Shutting down the AsyncFunctionName thread pool 

And finally…

2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task 
- AsyncFunctionName (1/1) (fcb7bbe7cd89f1167f8a656b0f2fdaf9) 
switched from CANCELING to CANCELED.
2018-03-21 14:51:35,340 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for AsyncFunctionName (1/1) 
(fcb7bbe7cd89f1167f8a656b0f2fdaf9).

I’ve looked through the code, and I don’t see any place where I’m interrupting 
any threads. When I shut down my own thread pool, interrupts will be generated, 
but only for threads used by my pool, and this happens after the 
InterruptedException.

Is this a known issue? Or is there something I can to do to avoid it?

Thanks,

— Ken 

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Out off memory when catching up

2018-03-21 Thread Lasse Nedergaard
Hi. 

When our jobs are catching up they read with a factor 10-20 times normal rate 
but then we loose our task managers with OOM. We could increase the memory 
allocation but is there a way to figure out how high rate we can consume with 
the current memory and slot allocation and a way to limit the input to avoid OOM

Med venlig hilsen / Best regards
Lasse Nedergaard



Re: ListCheckpointed function - what happens prior to restoreState() being called?

2018-03-21 Thread Ken Krugler
Hi Fabian,

> On Mar 20, 2018, at 6:38 AM, Fabian Hueske  wrote:
> 
> Hi Ken,
> 
> The documentation page describes that first the state is restored / 
> initialized and then the function's open() method is called [1].

Yes, thanks - my question was about ListCheckpointed.restoreState(), which 
doesn’t seem to be described on that page.

> I had a look at the code and it looks like the docs are correct [2]

Thanks for the gentle push to look at the code :)

I see that in 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java#L173
 
,
 if CheckpointedFunction is implemented then initializeState() is called, 
otherwise if ListCheckpointed is implemented then restoreState() is called. 
Essentially they are equivalent, and it’s a one or the other.

So the net-net is that I need to make sure my state variables are allocated in 
my constructor, not in my open method.

What tripped me up is that my state variables are transient, as I don’t want 
them serialized. Years of Cascading coding has made this a reflex action, where 
you then set up all of your transient variables in the open() call.

But that’s not correct in this case. So they have to be allocated in the 
constructor.

Regards,

— Ken

> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html#normal-execution
>  
> 
> [2] 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L296
>  
> 
> 
> 2018-03-20 0:49 GMT+01:00 Ken Krugler  >:
> Hi all,
> 
> If I implement ListCheckpointed in a function, is there a guarantee that 
> open() is called before restoreState()?
> 
> Asking because it doesn’t seem to be the case, and I didn’t notice this being 
> described here:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/task_lifecycle.html
>  
> 
> 
> If so, then is it normally best practice to also implement the 
> CheckpointedFunction interface, so that initializeState() method is called 
> before the restore?
> 
> In Flink test code, I don’t see this typically being done.
> 
> Thanks,
> 
> — Ken


http://about.me/kkrugler
+1 530-210-6378



Re: Confluent Schema Registry DeserializationSchema

2018-03-21 Thread dim5b
I added kafka tomy  dependencies although i am not sure why this would be
required... seems to work


org.apache.kafka
kafka_${kafka.scala.version}
${kafka.version}


This is my full dependency list...





org.apache.flink
flink-java
${flink.version}
provided


org.apache.flink

flink-streaming-java_${scala.binary.version}
${flink.version}
provided




org.apache.flink

flink-connector-kafka-0.11_${scala.binary.version}
${flink.version}



org.apache.flink

flink-statebackend-rocksdb_${scala.binary.version}
${flink.version}





org.slf4j
slf4j-log4j12
1.7.7
runtime


log4j
log4j
1.2.17
runtime


eu.neurocom
mip-model-poc
1.0


org.apache.flink
flink-avro
${flink.version}


io.confluent
kafka-avro-serializer
4.0.0


org.apache.kafka
kafka_${kafka.scala.version}
${kafka.version}



This does solve the issue but now i am getting the folowing error...


java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to eu.neurocom.avro.CelloAvro
at
eu.neurocom.schema.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:37)
at
eu.neurocom.schema.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:16)
at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:652)





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error running on Hadoop 2.7

2018-03-21 Thread ashish pok
Hi Piotrek,
At this point we are simply trying to start a YARN session. 
BTW, we are on Hortonworks HDP 2.6 which is on 2.7 Hadoop if anyone has 
experienced similar issues. 
We actually pulled 2.6 binaries for the heck of it and ran into same issues. 
I guess we are left with getting non-hadoop binaries and set HADOOP_CLASSPATH 
then?

-- Ashish 
 
  On Wed, Mar 21, 2018 at 12:03 PM, Piotr Nowojski 
wrote:   Hi,
> Does some simple word count example works on the cluster after the upgrade?
If not, maybe your job is pulling some dependency that’s causing this version 
conflict?
Piotrek


On 21 Mar 2018, at 16:52, ashish pok  wrote:
Hi Piotrek,
Yes, this is a brand new Prod environment. 2.6 was in our lab.
Thanks,

-- Ashish 
 
  On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski 
wrote:   Hi,
Have you replaced all of your old Flink binaries with freshly downloaded Hadoop 
2.7 versions? Are you sure that something hasn't mix in the process?
Does some simple word count example works on the cluster after the upgrade?
Piotrek


On 21 Mar 2018, at 16:11, ashish pok  wrote:
Hi All,
We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
like :) We definitely are using 2.7 binaries but it looks like there is a call 
here to a private methos which screams runtime incompatibility. 
Anyone has seen this and have pointers?
Thanks, Ashish

Exception in thread "main" java.lang.IllegalAccessError: tried to access method 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
 from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
    at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
    at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
    at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)



  


  


Re: entrypoint for executing job in task manager

2018-03-21 Thread Stephan Ewen
It would be great to understand a bit more what the exact requirements here
are, and what setup you use.

I am not a dependency injection expert, so let me know if what I am
suggesting here is complete bogus.


*(1) Fix set of libraries for Dependency Injection, or dedicated container
images per application*

If you have a dedicated JM and TM Flink image that you build per job, I
would assume that you also put all the required the libraries directly into
the lib folder, so everything is available on startup.

In that case, could you just warp the TM and JM main methods to first call
the initialization methods to set up dependency injection?

This would also work if you have container images that are not
job-specific, but all the libraries relevant to dependency injection are
part of the image (the lib folder).

*(2) Generic container images, plus dynamic set of libraries for dependency
injection*

Assuming you do not have job-specific container images, and each
application brings its own dependencies it wants to set up for dependency
injection,
we could look in the following direction.

The dependencies need to be set up for each Task on the TaskManager  ,
because each task gets potentially a dedicated classloader.
Have you tried an approach like the following?

  - Create a static dependency initializer utility class that has a static "
installModulesIfNotYetInstalled ()" method.

  - Each class that you use should have as the first line a static
initializer block that calls that utility:

public class MyFunction implements MapFunction {

static {
InjectionUtil.installModulesIfNotYetInstalled();
}

public A map(B value) {...}

...
}


  - You can probably create yourself a base class that does that from which
all you functions extend.


On Fri, Dec 22, 2017 at 11:23 AM, Piotr Nowojski 
wrote:

> I don’t think there is such hook in the Flink code now. You will have to
> walk around this issue somehow in user space.
>
> Maybe you could make a contract that every operator before touching Guice,
> should call static synchronized method `initializeGuiceContext`. This
> method could search the classpath for classes with some specific
> annotations, for example `@MyInitializationHook` and install/add all of
> such hooks before actually using Guice?
>
> Piotrek
>
>
> On 21 Dec 2017, at 17:49, Steven Wu  wrote:
>
> We use Guice for dependency injection. We need to install *additional*
> Guice modules (for bindings) when setting up this static context of Guice
> injector.
>
> Calling the static initializer from operator open method won't really
> help. Not all operators are implemented by app developer who want to
> install additional Guice modules. E.g. kafka source operator is
> implemented/provided by our platform. I think the source operator will open
> first, which means app operator won't get a chance to initialize the static
> context. What would really help if there is a entry hook (at task manager)
> that is executed before any operator opening.
>
> On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski 
> wrote:
>
>> Open method is called just before any elements are processed. You can
>> hook in any initialisation logic there, including initialisation of a
>> static context. However keep in mind, that since this context is static, it
>> will be shared between multiple operators (if you are running parallelism >
>> number of task managers), so accesses to it must be synchronized (including
>> initialisation). Another thing to consider is that managing the life cycle
>> of static context can be tricky (when to close it and release it’s
>> resources).
>>
>> The questions is, whether you really need a static context?
>>
>> Thanks,
>> Piotrek
>>
>>
>> > On 21 Dec 2017, at 07:53, Steven Wu  wrote:
>> >
>> > Here is my understanding of how job submission works in Flink. When
>> submitting a job to job manager via REST API, we provide a entry class. Job
>> manager then evaluate job graph and ship serialized operators to task
>> manager. Task manager then open operators and run tasks.
>> >
>> > My app would typically requires some initialization phase to setup my
>> own running context in task manager (e.g. calling a static method of some
>> class). Does Flink provide any entry hook in task manager when executing a
>> job (and tasks)? As for job manager, the entry class provides such hook
>> where I can initialize my static context.
>> >
>> > Thanks,
>> > Steven
>>
>>
>
>


Re: Error running on Hadoop 2.7

2018-03-21 Thread Piotr Nowojski
Hi,

> Does some simple word count example works on the cluster after the upgrade?

If not, maybe your job is pulling some dependency that’s causing this version 
conflict?

Piotrek

> On 21 Mar 2018, at 16:52, ashish pok  wrote:
> 
> Hi Piotrek,
> 
> Yes, this is a brand new Prod environment. 2.6 was in our lab.
> 
> Thanks,
> 
> -- Ashish
> 
> On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski
>  wrote:
> Hi,
> 
> Have you replaced all of your old Flink binaries with freshly downloaded 
>  Hadoop 2.7 versions? Are you sure 
> that something hasn't mix in the process?
> 
> Does some simple word count example works on the cluster after the upgrade?
> 
> Piotrek
> 
>> On 21 Mar 2018, at 16:11, ashish pok > > wrote:
>> 
>> Hi All,
>> 
>> We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
>> 2.7. It was supposed to be an easy lift to get a YARN session but doesnt 
>> seem like :) We definitely are using 2.7 binaries but it looks like there is 
>> a call here to a private methos which screams runtime incompatibility. 
>> 
>> Anyone has seen this and have pointers?
>> 
>> Thanks, Ashish
>> Exception in thread "main" java.lang.IllegalAccessError: tried to access 
>> method 
>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>>  from class 
>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider <>
>> at 
>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
>> at 
>> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
>> at 
>> org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
>> at 
>> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
>> at 
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
>> at 
>> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
>> 
> 



Re: Confluent Schema Registry DeserializationSchema

2018-03-21 Thread Piotr Nowojski
Hi,

It looks like to me that kafka.utils.VerifiableProperties comes  from 
org.apache.kafka:kafka package - please check and solve (if possible) 
dependency conflicts in your pom.xml regarding this package. Probably there is 
some version collision.

Piotrek

> On 21 Mar 2018, at 16:40, dim5b  wrote:
> 
> I trying to connect to schema registry and deserialize the project. 
> 
> I am building my project and on mvn build i get the  error
> 
> class file for kafka.utils.VerifiableProperties not found...
> 
> 
> import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
> import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
> import io.confluent.kafka.serializers.KafkaAvroDecoder;
> import org.apache.flink.api.common.serialization.DeserializationSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> 
> 
> public class ConfluentAvroDeserializationSchema implements
> DeserializationSchema {
> 
>private final String schemaRegistryUrl;
>private final int identityMapCapacity;
>private KafkaAvroDecoder kafkaAvroDecoder;
> 
>public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
>this(schemaRegistyUrl, 1000);
>}
> 
>public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
> identityMapCapacity) {
>this.schemaRegistryUrl = schemaRegistryUrl;
>this.identityMapCapacity = identityMapCapacity;
>}
> 
>@Override
>public CelloAvro deserialize(byte[] bytes) throws IOException {
>if (kafkaAvroDecoder == null) {
>SchemaRegistryClient schemaRegistry = new
> CachedSchemaRegistryClient(this.schemaRegistryUrl,
> this.identityMapCapacity);
>this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
>}
>return (CelloAvro) this.kafkaAvroDecoder.fromBytes(bytes);
>}
> 
>@Override
>public boolean isEndOfStream(CelloAvro celloAvro) {
>return false;
>}
> 
>@Override
>public TypeInformation getProducedType() {
>return TypeExtractor.getForClass(CelloAvro.class);
>}
> }
> 
> My dependencies are:
> 
> 
>   org.apache.flink
>   flink-avro
>   ${flink.version}
>   
> 
>   
>   io.confluent
>   kafka-avro-serializer
>   4.0.0
>   
> 
> 
> Could someone please help I see there is an open issue for an end to end
> test with  Confluent's Schema Registry
> 
> https://issues.apache.org/jira/browse/FLINK-8970
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error running on Hadoop 2.7

2018-03-21 Thread ashish pok
Hi Piotrek,
Yes, this is a brand new Prod environment. 2.6 was in our lab.
Thanks,

-- Ashish 
 
  On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski 
wrote:   Hi,
Have you replaced all of your old Flink binaries with freshly downloaded Hadoop 
2.7 versions? Are you sure that something hasn't mix in the process?
Does some simple word count example works on the cluster after the upgrade?
Piotrek


On 21 Mar 2018, at 16:11, ashish pok  wrote:
Hi All,
We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
like :) We definitely are using 2.7 binaries but it looks like there is a call 
here to a private methos which screams runtime incompatibility. 
Anyone has seen this and have pointers?
Thanks, Ashish

Exception in thread "main" java.lang.IllegalAccessError: tried to access method 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
 from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
    at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
    at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
    at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)



  


Re: Record Delivery Guarantee with Kafka 1.0.0

2018-03-21 Thread Stephan Ewen
This should be handled by Flink. The system does flush records on
checkpoints and does not confirm a checkpoint before all flushes are acked
back.

Did you turn on checkpointing? Without that, Flink cannot give guarantees
for exactly the reason you outlined above.



On Wed, Mar 14, 2018 at 9:34 PM, Aljoscha Krettek 
wrote:

> Hi,
>
> How are you checking that records are missing? Flink should flush to Kafka
> and wait for all records to be flushed when performing a checkpoint.
>
> Best,
> Aljoscha
>
> On 13. Mar 2018, at 21:31, Chirag Dewan  wrote:
>
> Hi,
>
> Still stuck around this.
>
> My understanding is, this is something Flink can't handle. If the
> batch-size of Kafka Producer is non zero(which ideally should be), there
> will be in-memory records and data loss(boundary cases). Only way I can
> handle this with Flink is my checkpointing interval, which flushes any
> buffered records.
>
> Is my understanding correct here? Or am I still missing something?
>
> thanks,
>
> Chirag
>
> On Monday, 12 March, 2018, 12:59:51 PM IST, Chirag Dewan <
> chirag.dewa...@yahoo.in> wrote:
>
>
> Hi,
>
> I am trying to use Kafka Sink 0.11 with ATLEAST_ONCE semantic and
> experiencing some data loss on Task Manager failure.
>
> Its a simple job with parallelism=1 and a single Task Manager. After a few
> checkpoints(kafka flush's) i kill one of my Task Manager running as a
> container on Docker Swarm.
>
> I observe a small number of records, usually 4-5, being lost on Kafka
> broker(1 broker cluster, 1 topic with 1 partition).
>
> My FlinkKafkaProducer config are as follows :
>
> batch.size=default(16384)
> retries=3
> max.in.flight.requests.per.connection=1
> acks=1
>
> As I understand it, all the messages batched by 
> KafkaProducer(RecordAccumulator)
> in the memory-buffer, are lost. Is this why I cant see my records on the
> broker? Or is there something I am doing terribly wrong? Any help
> appreciated.
>
> TIA,
>
> Chirag
>
>
>
>
>
>


Confluent Schema Registry DeserializationSchema

2018-03-21 Thread dim5b
I trying to connect to schema registry and deserialize the project. 

I am building my project and on mvn build i get the  error

 class file for kafka.utils.VerifiableProperties not found...


import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDecoder;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;


public class ConfluentAvroDeserializationSchema implements
DeserializationSchema {

private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;

public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}

public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
identityMapCapacity) {
this.schemaRegistryUrl = schemaRegistryUrl;
this.identityMapCapacity = identityMapCapacity;
}

@Override
public CelloAvro deserialize(byte[] bytes) throws IOException {
if (kafkaAvroDecoder == null) {
SchemaRegistryClient schemaRegistry = new
CachedSchemaRegistryClient(this.schemaRegistryUrl,
this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return (CelloAvro) this.kafkaAvroDecoder.fromBytes(bytes);
}

@Override
public boolean isEndOfStream(CelloAvro celloAvro) {
return false;
}

@Override
public TypeInformation getProducedType() {
return TypeExtractor.getForClass(CelloAvro.class);
}
}

My dependencies are:


org.apache.flink
flink-avro
${flink.version}



io.confluent
kafka-avro-serializer
4.0.0



Could someone please help I see there is an open issue for an end to end
test with  Confluent's Schema Registry

https://issues.apache.org/jira/browse/FLINK-8970






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error running on Hadoop 2.7

2018-03-21 Thread Piotr Nowojski
Hi,

Have you replaced all of your old Flink binaries with freshly downloaded 
 Hadoop 2.7 versions? Are you sure 
that something hasn't mix in the process?

Does some simple word count example works on the cluster after the upgrade?

Piotrek

> On 21 Mar 2018, at 16:11, ashish pok  wrote:
> 
> Hi All,
> 
> We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
> 2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
> like :) We definitely are using 2.7 binaries but it looks like there is a 
> call here to a private methos which screams runtime incompatibility. 
> 
> Anyone has seen this and have pointers?
> 
> Thanks, Ashish
> Exception in thread "main" java.lang.IllegalAccessError: tried to access 
> method 
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>  from class 
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider <>
> at 
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
> at 
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
> at 
> org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
> at 
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
> at 
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
> 



Re: Standalone cluster instability

2018-03-21 Thread Piotr Nowojski
Hi,

Does the issue really happen after 48 hours? 
Is there some indication of a failure in TaskManager log?

If you will be still unable to solve the problem, please provide full 
TaskManager and JobManager logs.

Piotrek

> On 21 Mar 2018, at 16:00, Alexander Smirnov  
> wrote:
> 
> One more question - I see a lot of line like the following in the logs
> 
> [2018-03-21 00:30:35,975] ERROR Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:35320 
> ] with UID [1500204560] 
> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
> [2018-03-21 00:34:15,208] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:41068 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> [2018-03-21 00:34:15,235] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:40677 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> [2018-03-21 00:34:15,256] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:40382 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> [2018-03-21 00:34:15,256] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:44744 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> [2018-03-21 00:34:15,266] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> 
> 
> The host is available, but I don't understand where port number comes from. 
> Task Manager uses another port (which is printed in logs on startup)
> Could you please help to understand why it happens?
> 
> Thank you,
> Alex
> 
> 
> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov 
> mailto:alexander.smirn...@gmail.com>> wrote:
> Hello,
> 
> I've assembled a standalone cluster of 3 task managers and 3 job managers(and 
> 3 ZK) following the instructions at 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html
>  
> 
>  and 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html
>  
> 
> 
> It works ok, but randomly, task managers becomes unavailable. JobManager has 
> exception like below in logs:
> 
> 
> [2018-03-19 00:33:10,211] WARN Association with remote system 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
> ] has failed, address is 
> now gated for [5000] ms. Reason: [Association failed with 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
> ]] Caused by: [Connection 
> refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413 
> ] 
> (akka.remote.ReliableDeliverySupervisor)
> [2018-03-21 00:30:35,975] ERROR Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:35320 
> ] with UID [1500204560] 
> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
> java.util.concurrent.TimeoutException: Remote system has been silent for too 
> long. (more than 48.0 hours)
> at 
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at 
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  

Error running on Hadoop 2.7

2018-03-21 Thread ashish pok
Hi All,
We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
like :) We definitely are using 2.7 binaries but it looks like there is a call 
here to a private methos which screams runtime incompatibility. 
Anyone has seen this and have pointers?
Thanks, Ashish

Exception in thread "main" java.lang.IllegalAccessError: tried to access method 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
 from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
    at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
    at 
org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
    at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
    at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
    at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
    at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
   


Re: Migration to Flip6 Kubernetes

2018-03-21 Thread Eron Wright
It would be helpful to expand on how, in job mode, the job graph would be
produced.  The phrase 'which contains the single job you want to execute'
has a few meanings; I believe Till means a serialized job graph, not an
executable JAR w/ main method.  Till is that correct?

On Tue, Mar 20, 2018 at 2:16 AM, Till Rohrmann  wrote:

> Hi Edward,
>
> you're right that Flink's Kubernetes documentation has not been updated
> with respect to Flip-6. This will be one of the tasks during the Flink 1.5
> release testing and is still pending.
>
> A Flink cluster can be run in two modes: session mode vs per-job mode. The
> former starts a cluster to which you can submit multiple jobs. The cluster
> shares the same ResourceManager and a Dispatcher which is responsible for
> spawning JobMasters which execute a single job each. The latter starts a
> Flink cluster which is pre-initialized with a JobGraph and only runs this
> job. Here we also start a ResourceManager and a MiniDispatcher whose job it
> is to simply start a single JobMaster with the pre-initialized JobGraph.
>
> StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.
>
> The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a
> look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the
> JobGraph from HDFS and then automatically starts executing it. There is no
> script which directly starts this entrypoint, but the YarnClusterDescriptor
> uses it when `deployJobCluster` is called.
>
> Depending on what you want to achieve: Either building generic K8 images
> to which you can submit any number of Flink jobs or having a special image
> which contains the single job you want to exeucte, you either have to call
> into the SessionClusterEntrypoint or the JobClusterEntrypoint. When
> starting a session cluster, then you can use bin/flink run to submit a job
> to this cluster.
>
> Let me know if you have other questions.
>
> Cheers,
> Till
>
> On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas 
> wrote:
>
>> Hello,
>>
>> Currently I have a Flink 1.4 cluster running on kubernetes based on the
>> configuration describe on
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> ops/deployment/kubernetes.html
>> with additional config for HA with Zookeeper.
>>
>> With this I have several Taskmanagers, a single Jobmanager and I create a
>> container for each job to perform the Job submission and manage Job
>> updates
>> with savepoints.
>>
>>
>> I'm looking into what would be needed to migrate to the new architecture
>> on
>> FLIP6 as we are planning to use Flink 1.5 once it's ready.
>>
>> If I understand correctly from
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>> and the current code on master:
>>
>> * Taskmanagers would continue the same, i.e they will execute the
>> taskmanager.sh start-foreground  script, which with the flip6 mode
>> activated
>> will execute the new taskexecutor.TaskManagerRunner.
>>
>> * We will have now one Job Manager per Job which is really good; but I
>> don't
>> fully understand how this would be started.
>>
>> I notice that the jobmanager.sh with flip6 mode activated will execute
>> entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we
>> could
>> pass the job jar and parameters (?)
>>
>> So I think the other possibility to start the job would be via the /flink
>> run/ command with maybe an option to tell that we are creating a job with
>> job manager or would be this the default behaviour ?
>>
>> Or would be this the role of the JobMaster ? I didn't take a look to its
>> code but it's mentioned on the flip6 page. (however I don't see an
>> entrypoint from the scripts (?))
>>
>> Could you help me to understand how this is expected to be done ?
>>
>>
>> * Also I'm not sure to understand whether it would be better to have a
>> ResourceManager per job or a single ResourceManager per cluster, as in the
>> page is stated that there is a ResourceManager for
>> Self-contained-single-job, but it seems to me that it needs to have the
>> information about all JobManagers and TaskManagers (?)
>>
>>
>> Thanks in advance for the help you could provide.
>>
>> I'm interested in using Flip6 on kubernetes when it will be ready, so I
>> could help with some testing if needed.
>>
>> --
>> Edward
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Migration to Flip6 Kubernetes

2018-03-21 Thread Edward Rojas
Hi Till,

Thanks for the information. We are using the session cluster and is working
quite good, but we would like to benefit from the new approach of per-job
mode in order to have a better control over the jobs that are running on the
cluster. 

I took a look to the YarnJobClusterEntrypoint and I see now how this planned
to be done, but if I understand correctly, in the current state there is not
possible to start a Job cluster on kubernetes as there is only concrete
implementation for Yarn and mesos?

The objective being to have a Flink cluster running on per-job mode and able
to execute several self-contained jobs, I imagine the idea would be also to
have a Kubernetes specific implementation of the ResourceManager that would
be initialized along the TaskManagers and would be listening for the
"self-contained jobs" to join, assign resources and start the execution of
the specific job, each one with its own JobManager?

Is my understanding correct? 
Is the per-job mode on kubernetes planned to be included on 1.5 ?

Regards,
Edward




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Standalone cluster instability

2018-03-21 Thread Alexander Smirnov
One more question - I see a lot of line like the following in the logs

[2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
irrecoverably failed. Quarantining address. (akka.remote.Remoting)
[2018-03-21 00:34:15,208] WARN Association to [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:41068] with unknown UID is
irrecoverably failed. Address cannot be quarantined without knowing the
UID, gating instead for 5000 ms. (akka.remote.Remoting)
[2018-03-21 00:34:15,235] WARN Association to [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:40677] with unknown UID is
irrecoverably failed. Address cannot be quarantined without knowing the
UID, gating instead for 5000 ms. (akka.remote.Remoting)
[2018-03-21 00:34:15,256] WARN Association to [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:40382] with unknown UID is
irrecoverably failed. Address cannot be quarantined without knowing the
UID, gating instead for 5000 ms. (akka.remote.Remoting)
[2018-03-21 00:34:15,256] WARN Association to [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:44744] with unknown UID is
irrecoverably failed. Address cannot be quarantined without knowing the
UID, gating instead for 5000 ms. (akka.remote.Remoting)
[2018-03-21 00:34:15,266] WARN Association to [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:42413] with unknown UID is
irrecoverably failed. Address cannot be quarantined without knowing the
UID, gating instead for 5000 ms. (akka.remote.Remoting)


The host is available, but I don't understand where port number comes from.
Task Manager uses another port (which is printed in logs on startup)
Could you please help to understand why it happens?

Thank you,
Alex


On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <
alexander.smirn...@gmail.com> wrote:

> Hello,
>
> I've assembled a standalone cluster of 3 task managers and 3 job
> managers(and 3 ZK) following the instructions at
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html
>  and
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html
>
> It works ok, but randomly, task managers becomes unavailable. JobManager
> has exception like below in logs:
>
>
> [2018-03-19 00:33:10,211] WARN Association with remote system [akka.tcp://
> fl...@qafdsflinkw811.nn.five9lab.com:42413] has failed, address is now
> gated for [5000] ms. Reason: [Association failed with [akka.tcp://
> fl...@qafdsflinkw811.nn.five9lab.com:42413]] Caused by: [Connection
> refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413]
> (akka.remote.ReliableDeliverySupervisor)
> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
> fl...@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I can't find a reason for this exception, any ideas?
>
> Thank you,
> Alex
>


Re: Kafka ProducerFencedException after checkpointing

2018-03-21 Thread Piotr Nowojski
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the 
external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka 
transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails 
because of some intermittent failure (for example worker crash/restart), you 
will only have a couple of minutes for a successful Flink job restart. 
Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

> On 21 Mar 2018, at 10:30, Dongwon Kim  wrote:
> 
> Hi Piotr,
> 
> Now my streaming pipeline is working without retries. 
> I decreased Flink's checkpoint interval from 15min to 10min as you suggested 
> [see screenshot_10min_ckpt.png].
> 
> I though that producer's transaction timeout starts when the external 
> transaction starts.
> The truth is that Producer's transaction timeout starts after the last 
> external checkpoint is committed.
> Now that I have 15min for Producer's transaction timeout and 10min for 
> Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, 
> everything is working fine.
> Am I right?
> 
> Anyway thank you very much for the detailed explanation!
> 
> Best,
> 
> Dongwon
> 
> 
> 
> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> Please increase transaction.timeout.ms  to a 
> greater value or decrease Flink’s checkpoint interval, I’m pretty sure the 
> issue here is that those two values are overlapping. I think that’s even 
> visible on the screenshots. First checkpoint completed started at 14:28:48 
> and ended at 14:30:43, while the second one started at 14:45:53 and ended at 
> 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 
> seconds, with maximal transaction duration of 21 minutes.
> 
> In HAPPY SCENARIO (without any failure and restarting), you should assume 
> that your timeout interval should cover with some safety margin the period 
> between start of a checkpoint and end of the NEXT checkpoint, since this is 
> the upper bound how long the transaction might be used. In your case at least 
> ~25 minutes.
> 
> On top of that, as described in the docs, 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>  
> 
>  , in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly. 
> 
> Piotrek
> 
> 
>> On 20 Mar 2018, at 11:58, Dongwon Kim > > wrote:
>> 
>> Hi Piotr,
>> 
>> We have set producer's [transaction.timeout.ms 
>> ] to 15 minutes and have used the default 
>> setting for broker (15 mins).
>> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
>> Kafka's timeout is smaller than Flink's checkpoint interval.
>> As our first checkpoint just takes 2 minutes, it seems like transaction is 
>> not committed properly.
>> 
>> Best,
>> 
>> - Dongwon
>> 
>> 
>> 
>> 
>> 
>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> What’s your Kafka’s transaction timeout setting? Please both check Kafka 
>> producer configuration (transaction.timeout.ms 
>>  property) and Kafka broker configuration. 
>> The most likely cause of such error message is when Kafka's timeout is 
>> smaller then Flink’s checkpoint interval and transactions are not committed 
>> quickly enough before timeout occurring.
>> 
>> Piotrek
>> 
>>> On 17 Mar 2018, at 07:24, Dongwon Kim >> > wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
>>> 7th, ... checkpoints:
>>> --
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
>>> attempted an operation with an old epoch. Either there is a newer producer 
>>> with the same transactio

Standalone cluster instability

2018-03-21 Thread Alexander Smirnov
Hello,

I've assembled a standalone cluster of 3 task managers and 3 job
managers(and 3 ZK) following the instructions at

https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html
and
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html

It works ok, but randomly, task managers becomes unavailable. JobManager
has exception like below in logs:


[2018-03-19 00:33:10,211] WARN Association with remote system [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:42413] has failed, address is now
gated for [5000] ms. Reason: [Association failed with [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:42413]] Caused by: [Connection
refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413]
(akka.remote.ReliableDeliverySupervisor)
[2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://
fl...@qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
irrecoverably failed. Quarantining address. (akka.remote.Remoting)
java.util.concurrent.TimeoutException: Remote system has been silent for
too long. (more than 48.0 hours)
at
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I can't find a reason for this exception, any ideas?

Thank you,
Alex


Re: Apache Zookeeper vs Flink Zookeeper

2018-03-21 Thread Alexander Smirnov
Thanks Gary, appreciate your quick response

On Wed, Mar 21, 2018 at 2:33 PM Gary Yao  wrote:

> Hi Alex,
>
> You can use vanilla Apache ZooKeeper. The class FlinkZooKeeperQuorumPeer
> is only
> used if you start ZooKeeper via the provided script bin/zookeeper.sh.
> FlinkZooKeeperQuorumPeer does not add any functionality except creating
> ZooKeeper's myid file.
>
> Best,
> Gary
>
> On Wed, Mar 21, 2018 at 12:02 PM, Alexander Smirnov <
> alexander.smirn...@gmail.com> wrote:
>
>> Hi,
>>
>> For standalone cluster configuration, is it possible to use vanilla
>> Apache Zookeeper?
>>
>> I saw there's a wrapper around it in Flink -  FlinkZooKeeperQuorumPeer.
>> Is it mandatory to use it?
>>
>> Thank you,
>> Alex
>>
>
>


Re: Apache Zookeeper vs Flink Zookeeper

2018-03-21 Thread Gary Yao
Hi Alex,

You can use vanilla Apache ZooKeeper. The class FlinkZooKeeperQuorumPeer is
only
used if you start ZooKeeper via the provided script bin/zookeeper.sh.
FlinkZooKeeperQuorumPeer does not add any functionality except creating
ZooKeeper's myid file.

Best,
Gary

On Wed, Mar 21, 2018 at 12:02 PM, Alexander Smirnov <
alexander.smirn...@gmail.com> wrote:

> Hi,
>
> For standalone cluster configuration, is it possible to use vanilla Apache
> Zookeeper?
>
> I saw there's a wrapper around it in Flink -  FlinkZooKeeperQuorumPeer. Is
> it mandatory to use it?
>
> Thank you,
> Alex
>


Apache Zookeeper vs Flink Zookeeper

2018-03-21 Thread Alexander Smirnov
Hi,

For standalone cluster configuration, is it possible to use vanilla Apache
Zookeeper?

I saw there's a wrapper around it in Flink -  FlinkZooKeeperQuorumPeer. Is
it mandatory to use it?

Thank you,
Alex


Re: Strange behavior on filter, group and reduce DataSets

2018-03-21 Thread simone

Hi all,

an update: following Stephan directives on how to diagnose the issue, 
making Person immutable, the problem does not occur.


Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:

To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no 
setters, set fields in constructor instead). Does that make the 
problem go away?


  - Change the Person data type to not be a POJO by adding a dummy 
fields that is never used, but does not have a getter/setter. Does 
that make the problem go away?


If either of that is the case, it must be a mutability bug somewhere 
in either accidental object reuse or accidental serializer sharing.



On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske > wrote:


Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to
resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031


2018-03-19 16:35 GMT+01:00 simone mailto:simone.povosca...@gmail.com>>:

Hi Fabian,

This simple code reproduces the behavior ->
https://github.com/xseris/Flink-test-union


Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:

Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is
replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could
cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad
thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the
Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske mailto:fhue...@gmail.com>>:

Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske
mailto:fhue...@gmail.com>>:

HI Simone,

Looking at the plan, I don't see why this should be
happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to
reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone
mailto:simone.povosca...@gmail.com>>:

Hi Fabian,

reuse is not enabled. I attach the plan of the
execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:

Hi,

Union is actually a very simple operator (not
even an operator in Flink terms). It just merges
to inputs. There is no additional logic involved.
Therefore, it should also not emit records
before either of both ReduceFunctions sorted its
data.
Once the data has been sorted for the
ReduceFunction, the data is reduced and emitted
in a pipelined fashion, i.e., once the first
record is reduced, it is forwarded into the
MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts
processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse
objects, you have to be careful in how you
implement your functions.
If no, can you share the plan
(ExecutionEnvironment.getExecutionPlan()) that
was generated for the program?

Thanks,
Fabian

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions





2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
mailto:pomperma...@okkam.it>>:

Any help on this? This thing is very
strange..the "manual" union of the output of
the 2 datasets is different than the
flink-union of them..
Could it be a problem of the f

Re: Is Hadoop 3.0 integration planned?

2018-03-21 Thread Stephan Ewen
That is definitely a good thing to have, would like to have a discussion
about how to approach that after 1.5 is released.

On Wed, Mar 21, 2018 at 5:39 AM, Jayant Ameta  wrote:

>
> Jayant Ameta
>


Re: Queryable State

2018-03-21 Thread Kostas Kloudas
Hi Vishal,

As Fabian said, queryable state is just a feature that exposes the state kept 
within Flink, and it is not made to 
replace functionality that would otherwise be made by a sink. In the future the 
functionality will definitely evolve
but for there are no discussions currently, for keeping the state of a job even 
after the job is done.

For being able to do so, with exactly once semantics and all the guarantees 
provided by Flink, I would recommend
to use an external sink.

Cheers,
Kostas

> On Mar 19, 2018, at 6:18 PM, Vishal Santoshi  
> wrote:
> 
> Thank you. These do look like show stoppers for us.  But again thank you.
> 
> On Mon, Mar 19, 2018 at 10:31 AM, Fabian Hueske  > wrote:
> AFAIK, there have been discussions to replicate state among TMs to speed up 
> recovery (and improve availability).
> However, I'm not aware of plans to implement that.
> 
> I don't think serving state while a job is down has been considered yet.
> 
> 2018-03-19 15:17 GMT+01:00 Vishal Santoshi  >:
> Are there plans to address all or few of the above apart from the  "JM LB not 
> possible" which seems understandable ? 
> 
> On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske  > wrote:
> Queryable state is "just" an additional feature on regular keyed state. i.e., 
> the only difference is that you can read the state from an outside 
> application.
> Besides that it behaves exactly like regular application state
> 
> Queryable state is (at the moment) designed to be accessible if a job runs.
> If the job fails (and recovers) or is manually taken down for maintenance, 
> the state cannot be queried.
> It's not possible to put a load balancer in front of a JobManager. Only one 
> JM is the active master that maintains a running job.
> State is also not replicated. 
> 
> Best, Fabian
> 
> 
> 2018-03-19 14:24 GMT+01:00 Vishal Santoshi  >:
> Those are understandable. I am more interested in a few things ( and may be 
> more that could be added ) 
> 
> * As far as I can understand JM is the SPOF. Does HA become a necessity ?
> * If there are 2 or more JM could we theoretically have a LB fronting them ? 
> Thus it is a peer to peer access ( Cassandra ) or a master slave setup for JM 
> HA specifically for Queryable access ( For  flink jobs it is master slave ) 
> * Do we replicate state to other TMs for read optimization ( specifically to 
> avoid Hot Node issues ) ?
> * If the job goes down it seems the state is not accessible. What plans to we 
> have to "separate concerns" for Queryable state.
> 
> We consider Queryable State significant a feature Flink provides and would do 
> the necessary leg work if there are certain gaps in it being trully 
> considered a Highly Available Key Value store.
> 
> Regards.
> 
> 
> 
>  
> 
> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske  > wrote:
> Hi Vishal,
> 
> In general, Queryable State should be ready to use. 
> There are a few things to consider though:
> 
> - State queries are not synchronized with the application code, i.e., they 
> can happen at the same time. Therefore, the Flink application should not 
> modify objects that have been put into or read from the state if you are not 
> using the RocksDBStatebackend (which creates copies by deserialization).
> - State will be rolled back after a failure. Hence, you can read writes that 
> are not "committed by a checkpoint". 
> 
> @Kostas, did I forget something?
> 
> Best, Fabian
> 
> 
> 
> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi  >:
> To be more precise, is anything thing similar to 
> https://engineering.linkedin.com/blog/2018/03/air-traffic-controller--member-first-notifications-at-linkedin
>  
> 
>  . done in Samza, can be replicated with production level guarantees with 
> Flink Queryable state ( as it stands currently version 1.5 )  ? 
> 
> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi  > wrote:
> We are making few decisions on use cases where  Queryable state is a natural 
> fit 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
>  
> 
> 
> Is Queryable state production ready ? We will go to 1.5 flnk if that helps to 
> make the case for the usage.
> 
> 
> 
> 
> 
> 
>